Redis Stream消息队列

Redis Stream消息队列

Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。

一、Stream基础

#

1.1 什么是Stream

Stream是Redis的日志型数据结构:

┌─────────────────────────────────────────────────────────────┐
│ Stream: mystream │
│ │
│ ID │ Field-Value Pairs │
│ ──────────────────────┼──────────────────────────────── │
│ 1724620800000-0 │ sensor-id: 1234, temperature: 25 │
│ 1724620801000-0 │ sensor-id: 1235, temperature: 26 │
│ 1724620802000-0 │ sensor-id: 1236, temperature: 24 │
│ 1724620803000-0 │ sensor-id: 1234, temperature: 25.5 │
│ ... │ ... │
└─────────────────────────────────────────────────────────────┘

特点:
- 消息按ID有序排列
- ID由时间戳+序列号组成
- 消息持久化存储
- 支持消费者组

#

1.2 Stream ID格式

Stream ID: <millisecondsTime>-<sequenceNumber>

示例:
1724620800000-0 # 时间戳1724620800000毫秒,序列号0
1724620800000-1 # 同一时间,序列号1
1724620801000-0 # 下一毫秒,序列号0

特殊ID:
* # 让Redis自动生成ID
0 # 最小ID
$ # 最大ID(最新)
+ # 同*
- # 同0

二、Stream命令

#

2.1 添加消息

# XADD stream_name [MAXLEN len] [ID id] field value [field value ...]

# 添加消息(自动生成ID)
XADD mystream * sensor-id 1234 temperature 25
# 返回: "1724620800000-0"

# 添加消息(指定ID,通常不推荐)
XADD mystream 1724620800001-0 sensor-id 1235 temperature 26

# 限制Stream长度(近似裁剪)
XADD mystream MAXLEN ~ 1000 * sensor-id 1236 temperature 24

# 精确限制长度
XADD mystream MAXLEN 1000 * sensor-id 1237 temperature 27

#

2.2 读取消息

# XRANGE: 按范围读取
XRANGE mystream - + # 读取所有
XRANGE mystream - + COUNT 10 # 读取前10条
XRANGE mystream 1724620800000-0 1724620800001-0 # 读取指定范围

# XREVRANGE: 反向读取
XREVRANGE mystream + - COUNT 10 # 读取最新10条

# XREAD: 阻塞读取(类似List的BLPOP)
XREAD BLOCK 5000 STREAMS mystream 0 # 从开头读取,阻塞5秒
XREAD BLOCK 0 STREAMS mystream $ # 从最新位置读取,永久阻塞
XREAD COUNT 10 STREAMS mystream 0 # 非阻塞读取10条

#

2.3 删除消息

# XDEL: 删除指定消息
XDEL mystream 1724620800000-0

# XTRIM: 裁剪Stream长度
XTRIM mystream MAXLEN 1000 # 精确裁剪到1000条
XTRIM mystream MAXLEN ~ 1000 # 近似裁剪

#

2.4 获取Stream信息

# XLEN: 获取消息数量
XLEN mystream

# XINFO STREAM: 获取Stream详细信息
XINFO STREAM mystream

# XINFO GROUPS: 获取消费者组信息
XINFO GROUPS mystream

# XINFO CONSUMERS: 获取消费者信息
XINFO CONSUMERS mystream mygroup

三、消费者组

#

3.1 消费者组概念

┌─────────────────────────────────────────┐
│ Stream: orders │
└─────────────────────────────────────────┘

┌─────────┴─────────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ Group A │ │ Group B │
│consumer1│ │consumer3│
│consumer2│ │consumer4│
└─────────┘ └─────────┘

特点:
- 同组内消息不重复消费(负载均衡)
- 不同组独立消费(广播)
- 支持消息确认(ACK)
- 支持pending列表(未确认消息)

#

3.2 消费者组命令

# XGROUP CREATE: 创建消费者组
XGROUP CREATE mystream mygroup $ MKSTREAM
# $: 从最新消息开始
# 0: 从第一条消息开始
# MKSTREAM: Stream不存在则创建

# XREADGROUP: 消费者组读取
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
# >: 读取未分配给任何消费者的消息

# XACK: 确认消息
XACK mystream mygroup 1724620800000-0 1724620800001-0

# XPENDING: 查看pending消息
XPENDING mystream mygroup
XPENDING mystream mygroup - + 10

# XCLAIM: 转移未确认消息
XCLAIM mystream mygroup consumer2 3600000 1724620800000-0
# 3600000: 空闲时间超过1小时的pending消息

# XGROUP DESTROY: 删除消费者组
XGROUP DESTROY mystream mygroup

四、Java中使用Stream

#

4.1 Spring Data Redis

@Service
public class StreamService {

@Autowired
private StringRedisTemplate redis;

/**
* 添加消息到Stream
*/
public RecordId addMessage(String streamKey, Map<String, String> message) {
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in(streamKey)
.ofMap(message);

return redis.opsForStream().add(record);
}

/**
* 创建消费者组
*/
public String createConsumerGroup(String streamKey, String groupName) {
return redis.opsForStream().createGroup(streamKey, groupName);
}

/**
* 消费者组读取消息
*/
public List<MapRecord<String, Object, Object>> readMessages(
String streamKey, String groupName, String consumerName, int count) {

Consumer consumer = Consumer.from(groupName, consumerName);

StreamReadOptions options = StreamReadOptions.empty()
.count(count)
.block(Duration.ofSeconds(5));

StreamOffset<String> offset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());

return redis.opsForStream().read(consumer, options, offset);
}

/**
* 确认消息
*/
public Long acknowledge(String streamKey, String groupName, String... recordIds) {
return redis.opsForStream().acknowledge(streamKey, groupName, recordIds);
}

/**
* 查看pending消息
*/
public PendingMessagesSummary pendingSummary(String streamKey, String groupName) {
return redis.opsForStream().pending(streamKey, groupName);
}
}

#

4.2 生产者实现

@Service
public class OrderEventProducer {

@Autowired
private StreamService streamService;

private static final String ORDER_STREAM = "orders";

/**
* 发送订单创建事件
*/
public void sendOrderCreated(Order order) {
Map<String, String> message = new HashMap<>();
message.put("eventType", "ORDER_CREATED");
message.put("orderId", String.valueOf(order.getId()));
message.put("userId", String.valueOf(order.getUserId()));
message.put("amount", String.valueOf(order.getAmount()));
message.put("timestamp", String.valueOf(System.currentTimeMillis()));

RecordId recordId = streamService.addMessage(ORDER_STREAM, message);
System.out.println("Order event sent: " + recordId);
}

/**
* 发送订单支付事件
*/
public void sendOrderPaid(Long orderId, String paymentId) {
Map<String, String> message = new HashMap<>();
message.put("eventType", "ORDER_PAID");
message.put("orderId", String.valueOf(orderId));
message.put("paymentId", paymentId);
message.put("timestamp", String.valueOf(System.currentTimeMillis()));

streamService.addMessage(ORDER_STREAM, message);
}
}

#

4.3 消费者实现

@Component
public class OrderEventConsumer {

@Autowired
private StreamService streamService;
@Autowired
private OrderService orderService;

private static final String ORDER_STREAM = "orders";
private static final String GROUP_NAME = "order-processors";
private static final String CONSUMER_NAME = "consumer-" + UUID.randomUUID().toString().substring(0, 8);

@PostConstruct
public void init() {
try {
streamService.createConsumerGroup(ORDER_STREAM, GROUP_NAME);
} catch (Exception e) {
// 组已存在
}

// 启动消费线程
startConsuming();
}

private void startConsuming() {
new Thread(() -> {
while (!Thread.interrupted()) {
try {
List<MapRecord<String, Object, Object>> records =
streamService.readMessages(ORDER_STREAM, GROUP_NAME, CONSUMER_NAME, 10);

for (MapRecord<String, Object, Object> record : records) {
processRecord(record);
}
} catch (Exception e) {
System.err.println("消费异常: " + e.getMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}).start();
}

private void processRecord(MapRecord<String, Object, Object> record) {
try {
Map<Object, Object> value = record.getValue();
String eventType = (String) value.get("eventType");

switch (eventType) {
case "ORDER_CREATED":
handleOrderCreated(value);
break;
case "ORDER_PAID":
handleOrderPaid(value);
break;
default:
System.out.println("Unknown event: " + eventType);
}

// 确认消息
streamService.acknowledge(ORDER_STREAM, GROUP_NAME, record.getId().getValue());

} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 不确认,消息会留在pending列表
}
}

private void handleOrderCreated(Map<Object, Object> value) {
Long orderId = Long.valueOf((String) value.get("orderId"));
System.out.println("处理订单创建: " + orderId);
// 业务处理
}

private void handleOrderPaid(Map<Object, Object> value) {
Long orderId = Long.valueOf((String) value.get("orderId"));
System.out.println("处理订单支付: " + orderId);
// 业务处理
}
}

#

4.4 Spring Boot Stream Listener

@Configuration
public class StreamListenerConfig {

@Bean
public Subscription orderSubscription(RedisConnectionFactory factory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();

StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(factory, options);

Consumer consumer = Consumer.from("order-group", "consumer-1");
StreamOffset<String> offset = StreamOffset.create("orders", ReadOffset.lastConsumed());

Subscription subscription = container.receive(consumer, offset, message -> {
System.out.println("Received: " + message.getValue());
});

container.start();
return subscription;
}
}

五、Stream vs List作为消息队列

特性 Stream List
消息持久化
消息ID 自动生成时间戳ID 无(需自己维护)
消费者组 原生支持 不支持
消息确认 支持ACK 不支持
消息回溯 支持 不支持(pop后删除)
消息数量 可查询 可查询
消息内容 支持多字段 单个String
阻塞读取 支持 支持(BLPOP)
适用场景 复杂消息队列 简单队列

六、最佳实践

#

6.1 消息清理

# 设置Stream最大长度(自动清理旧消息)
XADD mystream MAXLEN ~ 10000 * field value

# 手动裁剪
XTRIM mystream MAXLEN 10000

# 定期清理(配合过期时间无法实现,Stream不支持expire)
# 建议:
# 1. 使用MAXLEN自动裁剪
# 2. 定期XTRIM
# 3. 按时间归档到持久化存储

#

6.2 异常处理

@Component
public class StreamErrorHandler {

@Autowired
private StringRedisTemplate redis;

/**
* 处理pending消息(消费失败的消息)
*/
@Scheduled(fixedRate = 60000) // 每分钟检查
public void handlePendingMessages() {
String streamKey = "orders";
String groupName = "order-processors";

// 获取pending消息
PendingMessagesSummary summary = redis.opsForStream().pending(streamKey, groupName);

if (summary.getTotalPendingMessages() > 0) {
// 获取详细信息
Range<String> range = Range.closed("-", "+");
PendingMessages pending = redis.opsForStream().pending(
streamKey, groupName,
Consumer.from(groupName, "consumer-1"),
range, 100
);

for (PendingMessage message : pending) {
// 如果消息被多次消费失败,可以:
// 1. 转移到死信队列
// 2. 记录日志人工处理
// 3. 删除消息(不推荐)

if (message.getTotalDeliveryCount() > 3) {
// 超过3次,转移到死信队列
redis.opsForStream().add("orders:dlq",
Collections.singletonMap("originalId", message.getIdAsString()));

// 确认原消息
redis.opsForStream().acknowledge(streamKey, groupName, message.getIdAsString());
}
}
}
}
}

#

6.3 监控

# 查看Stream长度
XLEN mystream

# 查看消费者组状态
XINFO GROUPS mystream

# 查看pending消息数
XPENDING mystream mygroup

# 查看消费者状态
XINFO CONSUMERS mystream mygroup

七、总结

场景 推荐方案
简单队列 List + BLPOP
复杂消息队列 Stream + 消费者组
需要ACK确认 Stream
需要消息回溯 Stream
日志收集 Stream
事件驱动架构 Stream + 多消费者组

Stream的核心价值:

  1. 消息持久化:不丢失消息
  2. 消费者组:负载均衡和广播
  3. 消息确认:可靠消费
  4. 消息回溯:可重新消费历史消息

使用建议:

  1. 使用MAXLEN控制Stream大小
  2. 及时处理pending消息
  3. 合理设置消费者组数量
  4. 监控Stream长度和消费者延迟

核心要点

  1. String:简单的键值对,适合缓存、计数器

  2. Hash:存储对象属性,适合用户信息、配置

  3. List:有序列表,适合消息队列、最新列表

  4. Set:无序去重,适合共同好友、抽奖

  5. ZSet:有序集合,适合排行榜、积分系统

总结

选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。


   转载规则


《Redis Stream消息队列》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录