Redis List与Quicklist实现

Redis List与Quicklist实现

Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。

一、List编码演进

#

1.1 编码历史

Redis 3.2之前:
- 满足条件:ziplist(单个ziplist)
- 不满足:linkedlist(双向链表,每个节点一个ziplist)

Redis 3.2之后:
- quicklist(由多个ziplist组成的双向链表)

#

1.2 为什么引入Quicklist

LinkedList的问题

  • 每个节点需要prev/next指针(16字节开销)
  • 每个节点单独分配内存(内存碎片)
  • CPU缓存不友好(节点不连续)

Ziplist的问题

  • 大ziplist插入/删除效率低
  • 连锁更新问题

Quicklist的解决

  • 每个节点是一个小ziplist(平衡内存和性能)
  • 节点间用指针连接(保持双向链表的灵活性)
  • 可对节点进行压缩(节省内存)

二、Quicklist结构

#

2.1 整体结构

Quicklist:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ head │───>│ node1 │───>│ node2 │───> NULL
│ │ │ ziplist1 │ │ ziplist2 │
│ tail │<───│ │<───│ │
└──────────┘ └──────────┘ └──────────┘


count: 总元素数
len: 节点数
fill: 单个ziplist的最大entry数
compress: 两端保留未压缩的节点数

#

2.2 核心结构定义

typedef struct quicklist {
quicklistNode *head; // 头节点
quicklistNode *tail; // 尾节点
unsigned long count; // 总元素数量
unsigned long len; // 节点数量
int fill : QL_FILL_BITS; // 单个ziplist的fill因子
unsigned int compress : QL_COMP_BITS; // 压缩深度
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;

typedef struct quicklistNode {
struct quicklistNode *prev; // 前驱指针
struct quicklistNode *next; // 后继指针
unsigned char *zl; // ziplist数据
unsigned int sz; // ziplist大小(字节)
unsigned int count : 16; // ziplist中entry数
unsigned int encoding : 2; // 编码:RAW或LZF压缩
unsigned int container : 2; // 容器类型
unsigned int recompress : 1; // 是否临时解压
unsigned int attempted_compress : 1;
unsigned int extra : 10;
} quicklistNode;

#

2.3 配置参数

# redis.conf

# 单个ziplist的最大entry数
# 正数:entry数量限制
# 负数:大小限制(-1: 4KB, -2: 8KB, -3: 16KB, -4: 32KB, -5: 64KB)
list-max-ziplist-size -2 # 默认-2,即8KB

# 两端不压缩的节点数
# 0: 不压缩
# 1: 两端各1个节点不压缩,中间压缩
# 3: 两端各3个节点不压缩,中间压缩
list-compress-depth 0 # 默认不压缩

三、Quicklist操作

#

3.1 LPUSH/RPUSH

// 在头部插入
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;

// 尝试在头部节点插入
if (likely(quicklist->head &&
quicklist->head->encoding == QUICKLIST_NODE_ENCODING_RAW &&
quicklist->head->count < quicklist->fill)) {
// 头部ziplist还有空间,直接插入
quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklist->head->count++;
} else {
// 需要新建节点
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
node->count = 1;

_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}

quicklist->count++;
return 1;
}

#

3.2 节点分裂

当ziplist超过大小时,会分裂:

插入前:
[node: ziplist( entry1, entry2, entry3, entry4 )]

插入entry5后超过限制:
[node1: ziplist( entry1, entry2 )]

[node2: ziplist( entry3, entry4, entry5 )]

#

3.3 节点合并

当相邻节点的ziplist都很小时,可能合并:

合并前:
[node1: ziplist( entry1 )]

[node2: ziplist( entry2 )]

合并后:
[node1: ziplist( entry1, entry2 )]

#

3.4 压缩机制

// 压缩深度为1时:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│未压缩 │<─>│ LZF压缩 │<─>│ LZF压缩 │<─>│ LZF压缩 │<─>│未压缩 │
│头节点 │ │中间节点 │ │中间节点 │ │中间节点 │ │尾节点 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘

// 访问中间节点时临时解压,访问完可选择重新压缩

LZF压缩

  • 轻量级实时压缩算法
  • 压缩/解压速度快
  • 压缩比适中

四、List命令实现

#

4.1 LPOP/RPOP

void lpopCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
if (o == NULL || checkType(c, o, OBJ_LIST)) return;

// 从头部弹出
robj *value = listTypePop(o, LIST_HEAD);
if (value == NULL) {
addReplyNull(c);
} else {
addReplyBulk(c, value);
// 如果List为空,删除key
if (listTypeLength(o) == 0) {
dbDelete(c->db, c->argv[1]);
}
signalModifiedKey(c, c->db, c->argv[1]);
}
}

#

4.2 LRANGE

void lrangeCommand(client *c) {
robj *o = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray);
if (o == NULL || checkType(c, o, OBJ_LIST)) return;

long start, end;
getLongFromObjectOrReply(c, c->argv[2], &start, NULL);
getLongFromObjectOrReply(c, c->argv[3], &end, NULL);

// 处理负数索引
long llen = listTypeLength(o);
if (start < 0) start = llen + start;
if (end < 0) end = llen + end;
if (start < 0) start = 0;
if (end >= llen) end = llen - 1;

// 遍历输出
listTypeIterator *li = listTypeInitIterator(o, start, LIST_TAIL);
listTypeEntry entry;

addReplyArrayLen(c, (end - start + 1));
while (listTypeNext(li, &entry)) {
if (rangelen-- == 0) break;
addReplyBulkCBuffer(c, entry.value, entry.sz);
}
listTypeReleaseIterator(li);
}

#

4.3 LTRIM

void ltrimCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.ok);
if (o == NULL || checkType(c, o, OBJ_LIST)) return;

long start, end;
getLongFromObjectOrReply(c, c->argv[2], &start, NULL);
getLongFromObjectOrReply(c, c->argv[3], &end, NULL);

// 计算需要保留的范围
long llen = listTypeLength(o);
// ... 处理负数索引 ...

// 删除范围外的元素
listTypeIterator *li;
listTypeEntry entry;

// 删除头部多余元素
li = listTypeInitIterator(o, 0, LIST_TAIL);
while (listTypeNext(li, &entry) && start--) {
listTypeDelete(li, &entry);
}
listTypeReleaseIterator(li);

// 删除尾部多余元素
li = listTypeInitIterator(o, -1, LIST_HEAD);
while (listTypeNext(li, &entry) && llen - end - 1 > 0) {
listTypeDelete(li, &entry);
llen--;
}
listTypeReleaseIterator(li);

addReply(c, shared.ok);
}

五、阻塞操作实现

#

5.1 BLPOP/BRPOP

void blpopCommand(client *c) {
// 1. 先尝试非阻塞弹出
for (int j = 1; j < c->argc - 1; j++) {
robj *o = lookupKeyWrite(c->db, c->argv[j]);
if (o != NULL && o->type == OBJ_LIST && listTypeLength(o) != 0) {
// List非空,直接弹出
robj *value = listTypePop(o, LIST_HEAD);
// ... 返回结果 ...
return;
}
}

// 2. 所有List都为空,阻塞等待
blockForKeys(c, BLOCKED_LIST, c->argv + 1, c->argc - 2,
timeout, NULL);
}

#

5.2 唤醒机制

客户端A执行:BLPOP queue 30

├── 如果queue为空
│ └── 客户端A加入阻塞队列(按key组织)

└── 当客户端B执行:LPUSH queue job
└── 检查queue的阻塞客户端列表
└── 唤醒客户端A,弹出job返回

注意

  • 阻塞客户端按时间顺序排队
  • 多个客户端阻塞同一个key时,先到先服务
  • LPUSH可以同时唤醒多个阻塞客户端(如果 push 了多个元素)

六、应用场景实践

#

6.1 消息队列

@Component
public class RedisMessageQueue {

@Autowired
private StringRedisTemplate redis;

// 生产者
public void produce(String queue, String message) {
redis.opsForList().leftPush(queue, message);
}

// 消费者(阻塞式)
public void consume(String queue) {
while (running) {
// 阻塞弹出,最多等待30秒
String message = redis.opsForList()
.rightPop(queue, 30, TimeUnit.SECONDS);

if (message != null) {
processMessage(message);
}
}
}

// 多个队列优先级消费
public void consumePriority() {
while (running) {
// 从左到右按优先级
String message = redis.opsForList()
.rightPop("queue:high", "queue:normal", "queue:low",
30, TimeUnit.SECONDS);
if (message != null) {
processMessage(message);
}
}
}
}

#

6.2 最新列表(时间线)

@Service
public class TimelineService {

private static final int MAX_TIMELINE_SIZE = 1000;
private static final int PAGE_SIZE = 20;

// 发布动态
public void postMoment(Long userId, String momentJson) {
String key = "timeline:user:" + userId;

// LPUSH新动态到头部
redis.opsForList().leftPush(key, momentJson);

// LTRIM保留最近1000条
redis.opsForList().trim(key, 0, MAX_TIMELINE_SIZE - 1);

// 设置过期时间
redis.expire(key, 7, TimeUnit.DAYS);
}

// 获取时间线(分页)
public List<String> getTimeline(Long userId, int page) {
String key = "timeline:user:" + userId;
int start = page * PAGE_SIZE;
int end = start + PAGE_SIZE - 1;
return redis.opsForList().range(key, start, end);
}

// 获取关注者的时间线(拉模式)
public List<String> getFeed(Long userId, int page) {
// 获取关注列表
List<Long> followings = getFollowings(userId);

// 获取每个关注者的最新动态
List<String> keys = followings.stream()
.map(id -> "timeline:user:" + id)
.collect(Collectors.toList());

// 使用LRANGE获取后合并排序(应用层)
// 或使用Redis 6.2+的LMPOP

return mergeAndSort(keys, page);
}
}

#

6.3 栈(LIFO)

// 实现撤销/重做功能
public class UndoRedoManager {

private static final String UNDO_STACK = "undo:stack";
private static final String REDO_STACK = "redo:stack";

public void executeAction(String action) {
// 执行操作
performAction(action);

// 压入撤销栈
redis.opsForList().leftPush(UNDO_STACK, action);

// 清空重做栈
redis.delete(REDO_STACK);
}

public void undo() {
String action = redis.opsForList().leftPop(UNDO_STACK);
if (action != null) {
// 反向执行
reverseAction(action);

// 压入重做栈
redis.opsForList().leftPush(REDO_STACK, action);
}
}

public void redo() {
String action = redis.opsForList().leftPop(REDO_STACK);
if (action != null) {
performAction(action);
redis.opsForList().leftPush(UNDO_STACK, action);
}
}
}

#

6.4 队列(FIFO)

// 简单的任务队列
public class TaskQueue {

private static final String TASK_QUEUE = "task:queue";

public void submitTask(String taskJson) {
redis.opsForList().rightPush(TASK_QUEUE, taskJson);
}

public void processTasks() {
while (true) {
// 非阻塞消费
String task = redis.opsForList().leftPop(TASK_QUEUE);
if (task == null) {
Thread.sleep(1000);
continue;
}
executeTask(task);
}
}
}

七、List性能优化

#

7.1 避免大List

// 不好的做法:无限制增长
redis.lpush("big:list", value); // 从不清理

// 好的做法:控制大小
redis.lpush("list", value);
redis.ltrim("list", 0, 999); // 只保留1000条

#

7.2 批量操作

// 不好的做法:循环插入
for (String item : items) {
redis.lpush("list", item); // 每次网络往返
}

// 好的做法:使用Pipeline
redis.executePipelined((RedisCallback<Object>) connection -> {
for (String item : items) {
connection.lPush("list".getBytes(), item.getBytes());
}
return null;
});

#

7.3 慎用LINDEX深索引

# 越靠近中间越慢(O(n))
LINDEX list 0 # 快(头部)
LINDEX list -1 # 快(尾部)
LINDEX list 50000 # 慢(中间)

#

7.4 List分片

// 超大List分片存储
public void addToShardList(String key, String value) {
// 获取当前分片
String shardKey = getCurrentShard(key);
Long size = redis.opsForList().size(shardKey);

// 如果当前分片已满,创建新分片
if (size != null && size >= 10000) {
shardKey = createNewShard(key);
}

redis.opsForList().leftPush(shardKey, value);
}

八、常见问题

#

8.1 List的最大长度

理论限制:约40亿个元素(2^32 - 1)
实际限制:内存大小

建议:单个List不超过10000个元素

#

8.2 阻塞操作的超时

// 超时时间不要太短(减少CPU消耗)
// 好的做法:30-60秒
redis.opsForList().rightPop("queue", 30, TimeUnit.SECONDS);

// 不好的做法:1秒(频繁空转)
redis.opsForList().rightPop("queue", 1, TimeUnit.SECONDS);

#

8.3 阻塞与事务

# 阻塞命令不能在MULTI/EXEC中使用
MULTI
BLPOP queue 30 # 错误!会立即返回nil
EXEC

九、总结

特性 Quicklist
结构 ziplist + 双向链表
头部/尾部操作 O(1)
中间插入/删除 O(n)
索引访问 O(n)
内存 连续内存 + 指针开销

List使用建议:

  1. LPUSH + LTRIM:实现固定大小的最新列表
  2. BRPOP:实现高效的消息队列
  3. 控制大小:避免List无限增长
  4. 两端操作优先:避免中间操作
  5. 考虑Stream:需要持久化和消费者组时,用Stream替代List

Quicklist的设计体现了Redis的核心思想:在内存和性能之间寻找最佳平衡点。

核心要点

  1. String:简单的键值对,适合缓存、计数器

  2. Hash:存储对象属性,适合用户信息、配置

  3. List:有序列表,适合消息队列、最新列表

  4. Set:无序去重,适合共同好友、抽奖

  5. ZSet:有序集合,适合排行榜、积分系统

总结

选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。


   转载规则


《Redis List与Quicklist实现》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录