Redis List与Quicklist实现
Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。
一、List编码演进
#
1.1 编码历史
Redis 3.2之前: - 满足条件:ziplist(单个ziplist) - 不满足:linkedlist(双向链表,每个节点一个ziplist)
Redis 3.2之后: - quicklist(由多个ziplist组成的双向链表)
|
#
1.2 为什么引入Quicklist
LinkedList的问题:
- 每个节点需要prev/next指针(16字节开销)
- 每个节点单独分配内存(内存碎片)
- CPU缓存不友好(节点不连续)
Ziplist的问题:
Quicklist的解决:
- 每个节点是一个小ziplist(平衡内存和性能)
- 节点间用指针连接(保持双向链表的灵活性)
- 可对节点进行压缩(节省内存)
二、Quicklist结构
#
2.1 整体结构
Quicklist: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ head │───>│ node1 │───>│ node2 │───> NULL │ │ │ ziplist1 │ │ ziplist2 │ │ tail │<───│ │<───│ │ └──────────┘ └──────────┘ └──────────┘ │ ▼ count: 总元素数 len: 节点数 fill: 单个ziplist的最大entry数 compress: 两端保留未压缩的节点数
|
#
2.2 核心结构定义
typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; unsigned long len; int fill : QL_FILL_BITS; unsigned int compress : QL_COMP_BITS; unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist;
typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; unsigned char *zl; unsigned int sz; unsigned int count : 16; unsigned int encoding : 2; unsigned int container : 2; unsigned int recompress : 1; unsigned int attempted_compress : 1; unsigned int extra : 10; } quicklistNode;
|
#
2.3 配置参数
# redis.conf
# 单个ziplist的最大entry数 # 正数:entry数量限制 # 负数:大小限制(-1: 4KB, -2: 8KB, -3: 16KB, -4: 32KB, -5: 64KB) list-max-ziplist-size -2 # 默认-2,即8KB
# 两端不压缩的节点数 # 0: 不压缩 # 1: 两端各1个节点不压缩,中间压缩 # 3: 两端各3个节点不压缩,中间压缩 list-compress-depth 0 # 默认不压缩
|
三、Quicklist操作
#
3.1 LPUSH/RPUSH
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; if (likely(quicklist->head && quicklist->head->encoding == QUICKLIST_NODE_ENCODING_RAW && quicklist->head->count < quicklist->fill)) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklist->head->count++; } else { quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); node->count = 1; _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; return 1; }
|
#
3.2 节点分裂
当ziplist超过大小时,会分裂:
插入前: [node: ziplist( entry1, entry2, entry3, entry4 )]
插入entry5后超过限制: [node1: ziplist( entry1, entry2 )] ↕ [node2: ziplist( entry3, entry4, entry5 )]
|
#
3.3 节点合并
当相邻节点的ziplist都很小时,可能合并:
合并前: [node1: ziplist( entry1 )] ↕ [node2: ziplist( entry2 )]
合并后: [node1: ziplist( entry1, entry2 )]
|
#
3.4 压缩机制
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │未压缩 │<─>│ LZF压缩 │<─>│ LZF压缩 │<─>│ LZF压缩 │<─>│未压缩 │ │头节点 │ │中间节点 │ │中间节点 │ │中间节点 │ │尾节点 │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘
|
LZF压缩:
四、List命令实现
#
4.1 LPOP/RPOP
void lpopCommand(client *c) { robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]); if (o == NULL || checkType(c, o, OBJ_LIST)) return; robj *value = listTypePop(o, LIST_HEAD); if (value == NULL) { addReplyNull(c); } else { addReplyBulk(c, value); if (listTypeLength(o) == 0) { dbDelete(c->db, c->argv[1]); } signalModifiedKey(c, c->db, c->argv[1]); } }
|
#
4.2 LRANGE
void lrangeCommand(client *c) { robj *o = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray); if (o == NULL || checkType(c, o, OBJ_LIST)) return; long start, end; getLongFromObjectOrReply(c, c->argv[2], &start, NULL); getLongFromObjectOrReply(c, c->argv[3], &end, NULL); long llen = listTypeLength(o); if (start < 0) start = llen + start; if (end < 0) end = llen + end; if (start < 0) start = 0; if (end >= llen) end = llen - 1; listTypeIterator *li = listTypeInitIterator(o, start, LIST_TAIL); listTypeEntry entry; addReplyArrayLen(c, (end - start + 1)); while (listTypeNext(li, &entry)) { if (rangelen-- == 0) break; addReplyBulkCBuffer(c, entry.value, entry.sz); } listTypeReleaseIterator(li); }
|
#
4.3 LTRIM
void ltrimCommand(client *c) { robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.ok); if (o == NULL || checkType(c, o, OBJ_LIST)) return; long start, end; getLongFromObjectOrReply(c, c->argv[2], &start, NULL); getLongFromObjectOrReply(c, c->argv[3], &end, NULL); long llen = listTypeLength(o); listTypeIterator *li; listTypeEntry entry; li = listTypeInitIterator(o, 0, LIST_TAIL); while (listTypeNext(li, &entry) && start--) { listTypeDelete(li, &entry); } listTypeReleaseIterator(li); li = listTypeInitIterator(o, -1, LIST_HEAD); while (listTypeNext(li, &entry) && llen - end - 1 > 0) { listTypeDelete(li, &entry); llen--; } listTypeReleaseIterator(li); addReply(c, shared.ok); }
|
五、阻塞操作实现
#
5.1 BLPOP/BRPOP
void blpopCommand(client *c) { for (int j = 1; j < c->argc - 1; j++) { robj *o = lookupKeyWrite(c->db, c->argv[j]); if (o != NULL && o->type == OBJ_LIST && listTypeLength(o) != 0) { robj *value = listTypePop(o, LIST_HEAD); return; } } blockForKeys(c, BLOCKED_LIST, c->argv + 1, c->argc - 2, timeout, NULL); }
|
#
5.2 唤醒机制
客户端A执行:BLPOP queue 30 │ ├── 如果queue为空 │ └── 客户端A加入阻塞队列(按key组织) │ └── 当客户端B执行:LPUSH queue job └── 检查queue的阻塞客户端列表 └── 唤醒客户端A,弹出job返回
|
注意:
- 阻塞客户端按时间顺序排队
- 多个客户端阻塞同一个key时,先到先服务
- LPUSH可以同时唤醒多个阻塞客户端(如果 push 了多个元素)
六、应用场景实践
#
6.1 消息队列
@Component public class RedisMessageQueue { @Autowired private StringRedisTemplate redis; public void produce(String queue, String message) { redis.opsForList().leftPush(queue, message); } public void consume(String queue) { while (running) { String message = redis.opsForList() .rightPop(queue, 30, TimeUnit.SECONDS); if (message != null) { processMessage(message); } } } public void consumePriority() { while (running) { String message = redis.opsForList() .rightPop("queue:high", "queue:normal", "queue:low", 30, TimeUnit.SECONDS); if (message != null) { processMessage(message); } } } }
|
#
6.2 最新列表(时间线)
@Service public class TimelineService { private static final int MAX_TIMELINE_SIZE = 1000; private static final int PAGE_SIZE = 20; public void postMoment(Long userId, String momentJson) { String key = "timeline:user:" + userId; redis.opsForList().leftPush(key, momentJson); redis.opsForList().trim(key, 0, MAX_TIMELINE_SIZE - 1); redis.expire(key, 7, TimeUnit.DAYS); } public List<String> getTimeline(Long userId, int page) { String key = "timeline:user:" + userId; int start = page * PAGE_SIZE; int end = start + PAGE_SIZE - 1; return redis.opsForList().range(key, start, end); } public List<String> getFeed(Long userId, int page) { List<Long> followings = getFollowings(userId); List<String> keys = followings.stream() .map(id -> "timeline:user:" + id) .collect(Collectors.toList()); return mergeAndSort(keys, page); } }
|
#
6.3 栈(LIFO)
public class UndoRedoManager { private static final String UNDO_STACK = "undo:stack"; private static final String REDO_STACK = "redo:stack"; public void executeAction(String action) { performAction(action); redis.opsForList().leftPush(UNDO_STACK, action); redis.delete(REDO_STACK); } public void undo() { String action = redis.opsForList().leftPop(UNDO_STACK); if (action != null) { reverseAction(action); redis.opsForList().leftPush(REDO_STACK, action); } } public void redo() { String action = redis.opsForList().leftPop(REDO_STACK); if (action != null) { performAction(action); redis.opsForList().leftPush(UNDO_STACK, action); } } }
|
#
6.4 队列(FIFO)
public class TaskQueue { private static final String TASK_QUEUE = "task:queue"; public void submitTask(String taskJson) { redis.opsForList().rightPush(TASK_QUEUE, taskJson); } public void processTasks() { while (true) { String task = redis.opsForList().leftPop(TASK_QUEUE); if (task == null) { Thread.sleep(1000); continue; } executeTask(task); } } }
|
七、List性能优化
#
7.1 避免大List
redis.lpush("big:list", value);
redis.lpush("list", value); redis.ltrim("list", 0, 999);
|
#
7.2 批量操作
for (String item : items) { redis.lpush("list", item); }
redis.executePipelined((RedisCallback<Object>) connection -> { for (String item : items) { connection.lPush("list".getBytes(), item.getBytes()); } return null; });
|
#
7.3 慎用LINDEX深索引
LINDEX list 0 LINDEX list -1 LINDEX list 50000
|
#
7.4 List分片
public void addToShardList(String key, String value) { String shardKey = getCurrentShard(key); Long size = redis.opsForList().size(shardKey); if (size != null && size >= 10000) { shardKey = createNewShard(key); } redis.opsForList().leftPush(shardKey, value); }
|
八、常见问题
#
8.1 List的最大长度
理论限制:约40亿个元素(2^32 - 1) 实际限制:内存大小
建议:单个List不超过10000个元素
|
#
8.2 阻塞操作的超时
redis.opsForList().rightPop("queue", 30, TimeUnit.SECONDS);
redis.opsForList().rightPop("queue", 1, TimeUnit.SECONDS);
|
#
8.3 阻塞与事务
九、总结
| 特性 |
Quicklist |
| 结构 |
ziplist + 双向链表 |
| 头部/尾部操作 |
O(1) |
| 中间插入/删除 |
O(n) |
| 索引访问 |
O(n) |
| 内存 |
连续内存 + 指针开销 |
List使用建议:
- LPUSH + LTRIM:实现固定大小的最新列表
- BRPOP:实现高效的消息队列
- 控制大小:避免List无限增长
- 两端操作优先:避免中间操作
- 考虑Stream:需要持久化和消费者组时,用Stream替代List
Quicklist的设计体现了Redis的核心思想:在内存和性能之间寻找最佳平衡点。
核心要点
String:简单的键值对,适合缓存、计数器
Hash:存储对象属性,适合用户信息、配置
List:有序列表,适合消息队列、最新列表
Set:无序去重,适合共同好友、抽奖
ZSet:有序集合,适合排行榜、积分系统
总结
选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。