文章

RedisMQ

Redis实现消息队列

Redis 是一种高性能的内存数据存储工具,支持多种数据结构(如 list、set、pub/sub等),使其成为实现轻量级消息队列的理想选择。

本文将详细介绍 Redis 实现消息队列的几种方式,并提供具体的代码示例。

1. 消息队列的实现方式

1.1 使用 Redis List 结构实现简单消息队列

Redis 的 List 数据结构天然支持 先进先出 (FIFO) 的特性,可以通过 LPUSHRPOP 命令来实现消息的发布与消费。

  • LPUSH:将消息添加到队列的左端。
  • RPOP:从队列的右端弹出消息,实现消费。

优缺点

  • 优点:简单易用,性能高。
  • 缺点:没有消息确认机制,消息可能丢失。

1.2 使用 Redis Pub/Sub 实现实时消息队列

Redis 的 发布/订阅 (Pub/Sub) 功能允许消息的实时广播,多个订阅者可以接收同一条消息。

  • PUBLISH:发布消息。
  • SUBSCRIBE:订阅频道,实时接收消息。

优缺点

  • 优点:实时性强,适合广播消息。
  • 缺点:消息不持久,订阅者掉线后无法接收到历史消息。

1.3 使用 Redis Stream 实现可靠消息队列

Redis 5.0 引入了 Stream 数据结构,可以实现可靠的消息队列,支持:

  • 消息持久化。
  • 消息消费确认机制。
  • 多个消费者组(Consumer Groups)。

优缺点

  • 优点:支持持久化、消息确认机制,适合高可靠场景。
  • 缺点:相对复杂,需要学习新的命令。

2. 示例实现

仓库地址 https://gitea.hhdxw.top/yovinchen/RedisMQTest.git

2.1 使用 List 实现简单消息队列

代码示例:生产者和消费者

生产者:向 List 添加消息

import redis  
import time  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
# 队列名称  
QUEUE_NAME = "task_queue"  
  
  
# 生产者:向队列中添加消息  
def producer():  
    for i in range(1, 100):  
        message = f"task-{i}"  
        redis_client.lpush(QUEUE_NAME, message)  # 将消息推送到队列左端  
        print(f"Produced: {message}")  
        time.sleep(1)  # 模拟生产消息的间隔  
  
if __name__ == "__main__":  
    import threading  
  
    # 生产者  
    producer_thread = threading.Thread(target=producer)  
    producer_thread.start()  
    producer_thread.join()

消费者:消费 List 中的消息

import redis  
import time  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
# 队列名称  
QUEUE_NAME = "task_queue"  
  
# 消费者:从队列中获取消息  
def consumer():  
    while True:  
        message = redis_client.rpop(QUEUE_NAME)  # 从队列右端弹出消息  
        if message:  
            print(f"Consumed: {message.decode('utf-8')}")  
        else:  
            print("No messages, waiting...")  
        time.sleep(1)  
  
  
if __name__ == "__main__":  
    import threading  
  
    # 消费者  
    consumer_thread = threading.Thread(target=consumer)  
    consumer_thread.start()  
    consumer_thread.join()

2.2 使用 Pub/Sub 实现实时消息队列

代码示例:发布者和订阅者

发布者(Publisher):发布消息

import redis  
import time  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
CHANNEL_NAME = "news_channel"  
  
# 发布消息  
def publisher():  
    for i in range(1, 100):  
        message = f"news-{i}"  
        redis_client.publish(CHANNEL_NAME, message)  # 发布消息到频道  
        print(f"Published: {message}")  
        time.sleep(1)  
  
if __name__ == "__main__":  
    publisher()

订阅者(Subscriber):接收消息

import redis  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
CHANNEL_NAME = "news_channel"  
  
# 订阅频道  
def subscriber():  
    pubsub = redis_client.pubsub()  
    pubsub.subscribe(CHANNEL_NAME)  
    print(f"Subscribed to {CHANNEL_NAME}")  
  
    for message in pubsub.listen():  
        if message['type'] == 'message':  
            print(f"Received: {message['data'].decode('utf-8')}")  
  
if __name__ == "__main__":  
    subscriber()

2.3 使用 Stream 实现可靠消息队列

代码示例:生产者和消费者

生产者:向 Stream 添加消息

import redis  
import time  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
STREAM_NAME = "task_stream"  
  
# 生产者:向 Stream 中添加消息  
def producer():  
    for i in range(1, 1000):  
        message = {"task": f"task-{i}"}  
        redis_client.xadd(STREAM_NAME, message)  # 添加消息到 Stream        print(f"Produced: {message}")  
        time.sleep(1)  
  
if __name__ == "__main__":  
    producer()

消费者:读取 Stream 中的消息

import redis  
import time  
  
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)  
  
STREAM_NAME = "task_stream"  
  
# 消费者:读取 Stream 中的消息  
def consumer():  
    last_id = "0"  # 从头开始读取消息  
    while True:  
        messages = redis_client.xread({STREAM_NAME: last_id}, count=1, block=1000)  
        if messages:  
            for stream, message_list in messages:  
                for message_id, message in message_list:  
                    print(f"Consumed: ID={message_id}, Message={message}")  
                    last_id = message_id  # 更新最后消费的 ID                    time.sleep(1)  
        else:  
            print("No new messages, waiting...")  
  
if __name__ == "__main__":  
    consumer()

3. 比较与总结

实现方式 数据结构 特点 适用场景
List List 简单易用,FIFO 队列 轻量级消息队列
Pub/Sub Pub/Sub 实时广播消息,无持久化 实时通知/广播
Stream Stream 持久化、消息确认、多消费者 高可靠、分布式消息队列

4. 常见问题

  1. 消息丢失问题
    • 使用 Stream 数据结构,可以实现消息确认机制,避免消息丢失。
  2. 持久化问题
    • Pub/Sub 不支持持久化;需要持久化时,可以选择 Stream 或外部存储。
  3. 消费者重试机制
    • Stream 支持消息确认,未确认的消息可以被重新消费。

5. 小结

Redis 提供了多种方式实现消息队列,从简单的 List 到强大的 Stream,可以根据业务需求选择合适的方案。

  • 对于简单场景,使用 List 实现 FIFO 队列。
  • 对于实时通知,使用 Pub/Sub。
  • 对于高可靠场景,使用 Stream。
License:  CC BY 4.0