RedisMQ
Redis实现消息队列
Redis 是一种高性能的内存数据存储工具,支持多种数据结构(如 list、set、pub/sub等),使其成为实现轻量级消息队列的理想选择。
本文将详细介绍 Redis 实现消息队列的几种方式,并提供具体的代码示例。
1. 消息队列的实现方式
1.1 使用 Redis List 结构实现简单消息队列
Redis 的 List
数据结构天然支持 先进先出 (FIFO) 的特性,可以通过 LPUSH
和 RPOP
命令来实现消息的发布与消费。
- LPUSH:将消息添加到队列的左端。
- RPOP:从队列的右端弹出消息,实现消费。
优缺点
- 优点:简单易用,性能高。
- 缺点:没有消息确认机制,消息可能丢失。
1.2 使用 Redis Pub/Sub 实现实时消息队列
Redis 的 发布/订阅 (Pub/Sub) 功能允许消息的实时广播,多个订阅者可以接收同一条消息。
- PUBLISH:发布消息。
- SUBSCRIBE:订阅频道,实时接收消息。
优缺点
- 优点:实时性强,适合广播消息。
- 缺点:消息不持久,订阅者掉线后无法接收到历史消息。
1.3 使用 Redis Stream 实现可靠消息队列
Redis 5.0 引入了 Stream 数据结构,可以实现可靠的消息队列,支持:
- 消息持久化。
- 消息消费确认机制。
- 多个消费者组(Consumer Groups)。
优缺点
- 优点:支持持久化、消息确认机制,适合高可靠场景。
- 缺点:相对复杂,需要学习新的命令。
2. 示例实现
仓库地址 https://gitea.hhdxw.top/yovinchen/RedisMQTest.git
2.1 使用 List 实现简单消息队列
代码示例:生产者和消费者
生产者:向 List 添加消息
import redis
import time
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# 队列名称
QUEUE_NAME = "task_queue"
# 生产者:向队列中添加消息
def producer():
for i in range(1, 100):
message = f"task-{i}"
redis_client.lpush(QUEUE_NAME, message) # 将消息推送到队列左端
print(f"Produced: {message}")
time.sleep(1) # 模拟生产消息的间隔
if __name__ == "__main__":
import threading
# 生产者
producer_thread = threading.Thread(target=producer)
producer_thread.start()
producer_thread.join()
消费者:消费 List 中的消息
import redis
import time
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# 队列名称
QUEUE_NAME = "task_queue"
# 消费者:从队列中获取消息
def consumer():
while True:
message = redis_client.rpop(QUEUE_NAME) # 从队列右端弹出消息
if message:
print(f"Consumed: {message.decode('utf-8')}")
else:
print("No messages, waiting...")
time.sleep(1)
if __name__ == "__main__":
import threading
# 消费者
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
consumer_thread.join()
2.2 使用 Pub/Sub 实现实时消息队列
代码示例:发布者和订阅者
发布者(Publisher):发布消息
import redis
import time
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
CHANNEL_NAME = "news_channel"
# 发布消息
def publisher():
for i in range(1, 100):
message = f"news-{i}"
redis_client.publish(CHANNEL_NAME, message) # 发布消息到频道
print(f"Published: {message}")
time.sleep(1)
if __name__ == "__main__":
publisher()
订阅者(Subscriber):接收消息
import redis
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
CHANNEL_NAME = "news_channel"
# 订阅频道
def subscriber():
pubsub = redis_client.pubsub()
pubsub.subscribe(CHANNEL_NAME)
print(f"Subscribed to {CHANNEL_NAME}")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data'].decode('utf-8')}")
if __name__ == "__main__":
subscriber()
2.3 使用 Stream 实现可靠消息队列
代码示例:生产者和消费者
生产者:向 Stream 添加消息
import redis
import time
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
STREAM_NAME = "task_stream"
# 生产者:向 Stream 中添加消息
def producer():
for i in range(1, 1000):
message = {"task": f"task-{i}"}
redis_client.xadd(STREAM_NAME, message) # 添加消息到 Stream print(f"Produced: {message}")
time.sleep(1)
if __name__ == "__main__":
producer()
消费者:读取 Stream 中的消息
import redis
import time
# 连接 Redisredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
STREAM_NAME = "task_stream"
# 消费者:读取 Stream 中的消息
def consumer():
last_id = "0" # 从头开始读取消息
while True:
messages = redis_client.xread({STREAM_NAME: last_id}, count=1, block=1000)
if messages:
for stream, message_list in messages:
for message_id, message in message_list:
print(f"Consumed: ID={message_id}, Message={message}")
last_id = message_id # 更新最后消费的 ID time.sleep(1)
else:
print("No new messages, waiting...")
if __name__ == "__main__":
consumer()
3. 比较与总结
实现方式 | 数据结构 | 特点 | 适用场景 |
---|---|---|---|
List | List | 简单易用,FIFO 队列 | 轻量级消息队列 |
Pub/Sub | Pub/Sub | 实时广播消息,无持久化 | 实时通知/广播 |
Stream | Stream | 持久化、消息确认、多消费者 | 高可靠、分布式消息队列 |
4. 常见问题
- 消息丢失问题
- 使用 Stream 数据结构,可以实现消息确认机制,避免消息丢失。
- 持久化问题
- Pub/Sub 不支持持久化;需要持久化时,可以选择 Stream 或外部存储。
- 消费者重试机制
- Stream 支持消息确认,未确认的消息可以被重新消费。
5. 小结
Redis 提供了多种方式实现消息队列,从简单的 List 到强大的 Stream,可以根据业务需求选择合适的方案。
- 对于简单场景,使用 List 实现 FIFO 队列。
- 对于实时通知,使用 Pub/Sub。
- 对于高可靠场景,使用 Stream。
License:
CC BY 4.0