Redis管道Pipeline与批量操作

Redis管道Pipeline与批量操作

Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。

一、为什么需要批量操作

#

1.1 网络往返的开销

单独执行1000个GET命令:

客户端 网络 Redis
│──GET 1──> 1ms ──> │
│<─结果1── 1ms <── │
│──GET 2──> 1ms ──> │
│<─结果2── 1ms <── │
...
│──GET 1000─> 1ms ──> │
│<─结果1000 1ms <── │

总时间 = 1000 × 2ms = 2000ms

#

1.2 批量操作的优势

使用Pipeline执行1000个GET:

客户端 网络 Redis
│──GET 1──> 1ms ──> │
│──GET 2──> │
│──GET 3──> │
│ ... │
│──GET 1000 │
│ 1ms <── │
│<─结果1─2─3...1000 │

总时间 ≈ 2ms + 处理时间

二、原生批量命令

#

2.1 MGET/MSET

# 批量获取
MGET key1 key2 key3 key4 key5

# 批量设置
MSET key1 value1 key2 value2 key3 value3

# 批量设置并指定过期时间(Redis 2.6.12+)
MSET key1 value1 key2 value2 EX 60

Java使用

@Autowired
private StringRedisTemplate redis;

// MGET
List<String> values = redis.opsForValue().multiGet(Arrays.asList("key1", "key2", "key3"));

// MSET
Map<String, String> map = new HashMap<>();
map.put("key1", "value1");
map.put("key2", "value2");
redis.opsForValue().multiSet(map);

限制

  • 只能操作String类型
  • 所有key需要在同一slot(Cluster模式下)

#

2.2 HMGET/HMSET/HGETALL

# Hash批量操作
HMSET user:1001 name "Alice" age 25 city "Beijing"
HMGET user:1001 name age city
HGETALL user:1001

#

2.3 LPUSH/RPUSH(批量)

# List批量插入
LPUSH mylist value1 value2 value3 value4 value5

#

2.4 SADD/ZADD(批量)

# Set批量添加
SADD myset member1 member2 member3

# ZSet批量添加
ZADD myzset 1 member1 2 member2 3 member3

#

2.5 DEL/UNLINK(批量)

# 批量删除
DEL key1 key2 key3 key4 key5

# 异步删除(不阻塞,Redis 4.0+)
UNLINK key1 key2 key3

三、Pipeline管道

#

3.1 Pipeline原理

普通模式:            Pipeline模式:

请求1 ──> 请求1 ──>
<── 响应1 请求2 ──>
请求2 ──> 请求3 ──>
<── 响应2 ...
请求3 ──> <── 响应1,2,3...
<── 响应3

Pipeline不保证原子性,只是将多个命令打包发送,减少网络往返。

#

3.2 Pipeline使用

@Service
public class PipelineService {

@Autowired
private StringRedisTemplate redis;

// 批量写入
public void batchSet(Map<String, String> data) {
redis.executePipelined((RedisCallback<Object>) connection -> {
data.forEach((key, value) -> {
connection.stringCommands().set(
key.getBytes(), value.getBytes()
);
});
return null;
});
}

// 批量读取
public List<String> batchGet(List<String> keys) {
List<Object> results = redis.executePipelined((RedisCallback<Object>) connection -> {
keys.forEach(key -> {
connection.stringCommands().get(key.getBytes());
});
return null;
});

return results.stream()
.map(obj -> obj != null ? new String((byte[]) obj) : null)
.collect(Collectors.toList());
}

// 批量Hash操作
public void batchHashSet(Map<String, Map<String, String>> data) {
redis.executePipelined((RedisCallback<Object>) connection -> {
data.forEach((key, fields) -> {
Map<byte[], byte[]> byteFields = new HashMap<>();
fields.forEach((field, value) -> {
byteFields.put(field.getBytes(), value.getBytes());
});
connection.hashCommands().hMSet(key.getBytes(), byteFields);
});
return null;
});
}
}

#

3.3 Pipeline + 批量命令结合

// 最优方案:Pipeline内使用批量命令
public void optimalBatchSet(List<Map<String, String>> dataList) {
redis.executePipelined((RedisCallback<Object>) connection -> {
for (Map<String, String> data : dataList) {
// 每100个key一批
List<String> keys = new ArrayList<>(data.keySet());
for (int i = 0; i < keys.size(); i += 100) {
List<String> batch = keys.subList(i, Math.min(i + 100, keys.size()));
Map<byte[], byte[]> byteData = new HashMap<>();
batch.forEach(key -> {
byteData.put(key.getBytes(), data.get(key).getBytes());
});
connection.stringCommands().mSet(byteData);
}
}
return null;
});
}

#

3.4 注意事项

// 1. Pipeline不保证原子性
// 如果中间某个命令失败,其他命令仍会执行

// 2. Pipeline会占用连接
// 大量命令会占用连接较长时间

// 3. 建议分批使用Pipeline
public void batchWithChunks(List<String> keys, int chunkSize) {
for (int i = 0; i < keys.size(); i += chunkSize) {
List<String> chunk = keys.subList(i, Math.min(i + chunkSize, keys.size()));

List<Object> results = redis.executePipelined((RedisCallback<Object>) connection -> {
chunk.forEach(key -> connection.stringCommands().get(key.getBytes()));
return null;
});

// 处理本批次结果
processResults(results);
}
}

// 4. Pipeline在Cluster下的限制
// 命令需要发送到同一节点
// 可以使用Hash Tag让相关key在同一slot

四、Lua脚本批量操作

#

4.1 原子性批量操作

-- 批量删除匹配模式的key
local pattern = KEYS[1]
local cursor = "0"
local count = 0

repeat
local result = redis.call('scan', cursor, 'match', pattern, 'count', 100)
cursor = result[1]
local keys = result[2]

for i = 1, #keys do
redis.call('del', keys[i])
count = count + 1
end
until cursor == "0"

return count
@Service
public class LuaBatchService {

@Autowired
private StringRedisTemplate redis;

private static final String BATCH_DELETE_SCRIPT =
"local pattern = KEYS[1] " +
"local cursor = '0' " +
"local count = 0 " +
"repeat " +
" local result = redis.call('scan', cursor, 'match', pattern, 'count', 100) " +
" cursor = result[1] " +
" local keys = result[2] " +
" for i = 1, #keys do " +
" redis.call('del', keys[i]) " +
" count = count + 1 " +
" end " +
"until cursor == '0' " +
"return count";

public Long batchDeleteByPattern(String pattern) {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(BATCH_DELETE_SCRIPT, Long.class);
return redis.execute(script, Collections.singletonList(pattern));
}
}

五、性能对比

#

5.1 测试场景

场景:写入10000个key

方案1:循环单条写入
for (i = 0; i < 10000; i++) {
redis.set("key" + i, "value" + i);
}
耗时:约 20秒(假设RTT=1ms)

方案2:MSET(每100个一批)
for (i = 0; i < 100; i++) {
Map<String, String> batch = ...;
redis.mset(batch); // 每批100个
}
耗时:约 200ms

方案3:Pipeline
redis.executePipelined(callback -> {
for (i = 0; i < 10000; i++) {
connection.set(("key" + i).getBytes(), ("value" + i).getBytes());
}
});
耗时:约 100ms

方案4:Pipeline + MSET
redis.executePipelined(callback -> {
for (i = 0; i < 100; i++) {
// 每100个用MSET
callback.mSet(batchData);
}
});
耗时:约 50ms

#

5.2 性能对比表

方案 10000次写入 特点
单条循环 ~20秒 最慢,大量网络往返
MSET(100一批) ~200ms 好,但有批量限制
Pipeline ~100ms 很好,减少网络往返
Pipeline+MSET ~50ms 最好,结合两种优势

六、生产实践

#

6.1 批量导入数据

@Service
public class DataImportService {

@Autowired
private StringRedisTemplate redis;

private static final int BATCH_SIZE = 500;
private static final int PIPELINE_SIZE = 1000;

public void importData(List<User> users) {
// 分批使用Pipeline导入
for (int i = 0; i < users.size(); i += PIPELINE_SIZE) {
List<User> batch = users.subList(i, Math.min(i + PIPELINE_SIZE, users.size()));

redis.executePipelined((RedisCallback<Object>) connection -> {
batch.forEach(user -> {
String key = "user:" + user.getId();
String value = JSON.toJSONString(user);
connection.stringCommands().set(
key.getBytes(),
value.getBytes(),
Expiration.from(3600, TimeUnit.SECONDS),
RedisStringCommands.SetOption.UPSERT
);
});
return null;
});

// 每批完成后短暂休息,避免Redis压力过大
if (i + PIPELINE_SIZE < users.size()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}

#

6.2 批量读取优化

@Service
public class BatchReadService {

@Autowired
private StringRedisTemplate redis;

public Map<Long, User> getUsers(List<Long> userIds) {
Map<Long, User> result = new HashMap<>();
List<Long> missingIds = new ArrayList<>();

// 分批Pipeline读取
for (int i = 0; i < userIds.size(); i += 500) {
List<Long> batch = userIds.subList(i, Math.min(i + 500, userIds.size()));
List<String> keys = batch.stream()
.map(id -> "user:" + id)
.collect(Collectors.toList());

List<String> values = redis.opsForValue().multiGet(keys);

for (int j = 0; j < batch.size(); j++) {
String value = values.get(j);
if (value != null) {
result.put(batch.get(j), JSON.parseObject(value, User.class));
} else {
missingIds.add(batch.get(j));
}
}
}

// 从数据库补充未命中的数据
if (!missingIds.isEmpty()) {
List<User> dbUsers = userMapper.findByIds(missingIds);
dbUsers.forEach(user -> {
result.put(user.getId(), user);
redis.opsForValue().set("user:" + user.getId(),
JSON.toJSONString(user), 3600, TimeUnit.SECONDS);
});
}

return result;
}
}

#

6.3 批量删除过期数据

@Service
public class DataCleanupService {

@Autowired
private StringRedisTemplate redis;

// 使用SCAN + Pipeline批量删除
public void cleanupExpiredData(String pattern) {
ScanOptions options = ScanOptions.scanOptions()
.match(pattern)
.count(100)
.build();

Cursor<byte[]> cursor = redis.executeWithStickyConnection(
connection -> connection.scan(options)
);

List<byte[]> keysToDelete = new ArrayList<>();

while (cursor.hasNext()) {
keysToDelete.add(cursor.next());

// 每100个删除一次
if (keysToDelete.size() >= 100) {
deleteBatch(keysToDelete);
keysToDelete.clear();
}
}

// 删除剩余的
if (!keysToDelete.isEmpty()) {
deleteBatch(keysToDelete);
}
}

private void deleteBatch(List<byte[]> keys) {
redis.executePipelined((RedisCallback<Object>) connection -> {
keys.forEach(key -> connection.keyCommands().del(key));
return null;
});
}
}

七、总结

批量方式 原子性 适用场景 性能
MGET/MSET String批量读写
HMGET/HMSET Hash批量读写
Pipeline 大量命令批量执行 很好
Lua脚本 需要原子性的复杂操作

批量操作的核心原则:

  1. 减少网络往返:使用Pipeline或批量命令
  2. 控制批次大小:避免单批次过大导致阻塞
  3. 结合使用:Pipeline内使用批量命令效果更佳
  4. 注意原子性需求:需要原子性时用Lua脚本
  5. Cluster注意:确保key在同一slot或使用Hash Tag

核心要点

  1. String:简单的键值对,适合缓存、计数器

  2. Hash:存储对象属性,适合用户信息、配置

  3. List:有序列表,适合消息队列、最新列表

  4. Set:无序去重,适合共同好友、抽奖

  5. ZSet:有序集合,适合排行榜、积分系统

总结

选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。


   转载规则


《Redis管道Pipeline与批量操作》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录