Redis管道Pipeline与批量操作 Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。
一、为什么需要批量操作 #
1.1 网络往返的开销 单独执行1000个GET命令: 客户端 网络 Redis │──GET 1──> 1ms ──> │ │<─结果1── 1ms <── │ │──GET 2──> 1ms ──> │ │<─结果2── 1ms <── │ ... │──GET 1000─> 1ms ──> │ │<─结果1000 1ms <── │ 总时间 = 1000 × 2ms = 2000ms
#
1.2 批量操作的优势 使用Pipeline执行1000个GET: 客户端 网络 Redis │──GET 1──> 1ms ──> │ │──GET 2──> │ │──GET 3──> │ │ ... │ │──GET 1000 │ │ 1ms <── │ │<─结果1─2─3...1000 │ 总时间 ≈ 2ms + 处理时间
二、原生批量命令 #
2.1 MGET/MSET MGET key1 key2 key3 key4 key5 MSET key1 value1 key2 value2 key3 value3 MSET key1 value1 key2 value2 EX 60
Java使用 :
@Autowired private StringRedisTemplate redis;List<String> values = redis.opsForValue().multiGet(Arrays.asList("key1" , "key2" , "key3" )); Map<String, String> map = new HashMap <>(); map.put("key1" , "value1" ); map.put("key2" , "value2" ); redis.opsForValue().multiSet(map);
限制 :
只能操作String类型
所有key需要在同一slot(Cluster模式下)
#
2.2 HMGET/HMSET/HGETALL HMSET user:1001 name "Alice" age 25 city "Beijing" HMGET user:1001 name age city HGETALL user:1001
#
2.3 LPUSH/RPUSH(批量) LPUSH mylist value1 value2 value3 value4 value5
#
2.4 SADD/ZADD(批量) SADD myset member1 member2 member3 ZADD myzset 1 member1 2 member2 3 member3
#
2.5 DEL/UNLINK(批量) DEL key1 key2 key3 key4 key5 UNLINK key1 key2 key3
三、Pipeline管道 #
3.1 Pipeline原理 普通模式: Pipeline模式: 请求1 ──> 请求1 ──> <── 响应1 请求2 ──> 请求2 ──> 请求3 ──> <── 响应2 ... 请求3 ──> <── 响应1,2,3... <── 响应3
Pipeline不保证原子性,只是将多个命令打包发送,减少网络往返。
#
3.2 Pipeline使用 @Service public class PipelineService { @Autowired private StringRedisTemplate redis; public void batchSet (Map<String, String> data) { redis.executePipelined((RedisCallback<Object>) connection -> { data.forEach((key, value) -> { connection.stringCommands().set( key.getBytes(), value.getBytes() ); }); return null ; }); } public List<String> batchGet (List<String> keys) { List<Object> results = redis.executePipelined((RedisCallback<Object>) connection -> { keys.forEach(key -> { connection.stringCommands().get(key.getBytes()); }); return null ; }); return results.stream() .map(obj -> obj != null ? new String ((byte []) obj) : null ) .collect(Collectors.toList()); } public void batchHashSet (Map<String, Map<String, String>> data) { redis.executePipelined((RedisCallback<Object>) connection -> { data.forEach((key, fields) -> { Map<byte [], byte []> byteFields = new HashMap <>(); fields.forEach((field, value) -> { byteFields.put(field.getBytes(), value.getBytes()); }); connection.hashCommands().hMSet(key.getBytes(), byteFields); }); return null ; }); } }
#
3.3 Pipeline + 批量命令结合 public void optimalBatchSet (List<Map<String, String>> dataList) { redis.executePipelined((RedisCallback<Object>) connection -> { for (Map<String, String> data : dataList) { List<String> keys = new ArrayList <>(data.keySet()); for (int i = 0 ; i < keys.size(); i += 100 ) { List<String> batch = keys.subList(i, Math.min(i + 100 , keys.size())); Map<byte [], byte []> byteData = new HashMap <>(); batch.forEach(key -> { byteData.put(key.getBytes(), data.get(key).getBytes()); }); connection.stringCommands().mSet(byteData); } } return null ; }); }
#
3.4 注意事项 public void batchWithChunks (List<String> keys, int chunkSize) { for (int i = 0 ; i < keys.size(); i += chunkSize) { List<String> chunk = keys.subList(i, Math.min(i + chunkSize, keys.size())); List<Object> results = redis.executePipelined((RedisCallback<Object>) connection -> { chunk.forEach(key -> connection.stringCommands().get(key.getBytes())); return null ; }); processResults(results); } }
四、Lua脚本批量操作 #
4.1 原子性批量操作 local pattern = KEYS[1 ]local cursor = "0" local count = 0 repeat local result = redis.call('scan' , cursor, 'match' , pattern, 'count' , 100 ) cursor = result[1 ] local keys = result[2 ] for i = 1 , #keys do redis.call('del' , keys[i]) count = count + 1 end until cursor == "0" return count
@Service public class LuaBatchService { @Autowired private StringRedisTemplate redis; private static final String BATCH_DELETE_SCRIPT = "local pattern = KEYS[1] " + "local cursor = '0' " + "local count = 0 " + "repeat " + " local result = redis.call('scan', cursor, 'match', pattern, 'count', 100) " + " cursor = result[1] " + " local keys = result[2] " + " for i = 1, #keys do " + " redis.call('del', keys[i]) " + " count = count + 1 " + " end " + "until cursor == '0' " + "return count" ; public Long batchDeleteByPattern (String pattern) { DefaultRedisScript<Long> script = new DefaultRedisScript <>(BATCH_DELETE_SCRIPT, Long.class); return redis.execute(script, Collections.singletonList(pattern)); } }
五、性能对比 #
5.1 测试场景 场景:写入10000个key 方案1:循环单条写入 for (i = 0; i < 10000; i++) { redis.set("key" + i, "value" + i); } 耗时:约 20秒(假设RTT=1ms) 方案2:MSET(每100个一批) for (i = 0; i < 100; i++) { Map<String, String> batch = ...; redis.mset(batch); // 每批100个 } 耗时:约 200ms 方案3:Pipeline redis.executePipelined(callback -> { for (i = 0; i < 10000; i++) { connection.set(("key" + i).getBytes(), ("value" + i).getBytes()); } }); 耗时:约 100ms 方案4:Pipeline + MSET redis.executePipelined(callback -> { for (i = 0; i < 100; i++) { // 每100个用MSET callback.mSet(batchData); } }); 耗时:约 50ms
#
5.2 性能对比表
方案
10000次写入
特点
单条循环
~20秒
最慢,大量网络往返
MSET(100一批)
~200ms
好,但有批量限制
Pipeline
~100ms
很好,减少网络往返
Pipeline+MSET
~50ms
最好,结合两种优势
六、生产实践 #
6.1 批量导入数据 @Service public class DataImportService { @Autowired private StringRedisTemplate redis; private static final int BATCH_SIZE = 500 ; private static final int PIPELINE_SIZE = 1000 ; public void importData (List<User> users) { for (int i = 0 ; i < users.size(); i += PIPELINE_SIZE) { List<User> batch = users.subList(i, Math.min(i + PIPELINE_SIZE, users.size())); redis.executePipelined((RedisCallback<Object>) connection -> { batch.forEach(user -> { String key = "user:" + user.getId(); String value = JSON.toJSONString(user); connection.stringCommands().set( key.getBytes(), value.getBytes(), Expiration.from(3600 , TimeUnit.SECONDS), RedisStringCommands.SetOption.UPSERT ); }); return null ; }); if (i + PIPELINE_SIZE < users.size()) { try { Thread.sleep(10 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }
#
6.2 批量读取优化 @Service public class BatchReadService { @Autowired private StringRedisTemplate redis; public Map<Long, User> getUsers (List<Long> userIds) { Map<Long, User> result = new HashMap <>(); List<Long> missingIds = new ArrayList <>(); for (int i = 0 ; i < userIds.size(); i += 500 ) { List<Long> batch = userIds.subList(i, Math.min(i + 500 , userIds.size())); List<String> keys = batch.stream() .map(id -> "user:" + id) .collect(Collectors.toList()); List<String> values = redis.opsForValue().multiGet(keys); for (int j = 0 ; j < batch.size(); j++) { String value = values.get(j); if (value != null ) { result.put(batch.get(j), JSON.parseObject(value, User.class)); } else { missingIds.add(batch.get(j)); } } } if (!missingIds.isEmpty()) { List<User> dbUsers = userMapper.findByIds(missingIds); dbUsers.forEach(user -> { result.put(user.getId(), user); redis.opsForValue().set("user:" + user.getId(), JSON.toJSONString(user), 3600 , TimeUnit.SECONDS); }); } return result; } }
#
6.3 批量删除过期数据 @Service public class DataCleanupService { @Autowired private StringRedisTemplate redis; public void cleanupExpiredData (String pattern) { ScanOptions options = ScanOptions.scanOptions() .match(pattern) .count(100 ) .build(); Cursor<byte []> cursor = redis.executeWithStickyConnection( connection -> connection.scan(options) ); List<byte []> keysToDelete = new ArrayList <>(); while (cursor.hasNext()) { keysToDelete.add(cursor.next()); if (keysToDelete.size() >= 100 ) { deleteBatch(keysToDelete); keysToDelete.clear(); } } if (!keysToDelete.isEmpty()) { deleteBatch(keysToDelete); } } private void deleteBatch (List<byte []> keys) { redis.executePipelined((RedisCallback<Object>) connection -> { keys.forEach(key -> connection.keyCommands().del(key)); return null ; }); } }
七、总结
批量方式
原子性
适用场景
性能
MGET/MSET
是
String批量读写
好
HMGET/HMSET
是
Hash批量读写
好
Pipeline
否
大量命令批量执行
很好
Lua脚本
是
需要原子性的复杂操作
好
批量操作的核心原则:
减少网络往返 :使用Pipeline或批量命令
控制批次大小 :避免单批次过大导致阻塞
结合使用 :Pipeline内使用批量命令效果更佳
注意原子性需求 :需要原子性时用Lua脚本
Cluster注意 :确保key在同一slot或使用Hash Tag
核心要点
String:简单的键值对,适合缓存、计数器
Hash:存储对象属性,适合用户信息、配置
List:有序列表,适合消息队列、最新列表
Set:无序去重,适合共同好友、抽奖
ZSet:有序集合,适合排行榜、积分系统
总结 选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。