SpringBoot WebFlux响应式编程
Spring Boot 简化了配置,但日志管理依然需要重视。日志配置、链路追踪、排查思路都是日常开发中会遇到的问题。本文讲实际项目中的日志管理经验。
WebFlux vs MVC
| 特性 |
Spring MVC |
Spring WebFlux |
| 编程模型 |
命令式 |
声明式/响应式 |
| 线程模型 |
每请求一线程 |
事件循环 + 少量线程 |
| 阻塞 |
支持阻塞 |
非阻塞 |
| 底层 |
Servlet API |
Reactive Streams |
| 适用 |
传统 Web |
高并发/流式数据 |
Spring MVC: Spring WebFlux: Thread-per-Request Event Loop ┌───┐ ┌──────────┐ │Req│──Thread1 │Req1 │──Event Loop ├───┤ │Req2 │ ├──Worker │Req│──Thread2 │Req3... │ ├──Worker ├───┤ └──────────┘ └──Worker │Req│──Thread3
|
核心概念
#
Mono 和 Flux
Mono<String> mono = Mono.just("Hello"); Mono<String> empty = Mono.empty(); Mono<String> error = Mono.error(new RuntimeException());
Flux<String> flux = Flux.just("a", "b", "c"); Flux<Integer> range = Flux.range(1, 10); Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
|
#
订阅
mono.subscribe( value -> System.out.println("值: " + value), error -> System.err.println("错误: " + error), () -> System.out.println("完成") );
String result = mono.block();
List<String> list = flux.collectList().block();
|
#
操作符
Flux.just(1, 2, 3, 4, 5) .map(n -> n * n) .filter(n -> n > 10) .take(1) .subscribe(System.out::println);
Mono<String> m1 = Mono.just("Hello"); Mono<String> m2 = Mono.just("World"); Mono<String> combined = Mono.zip(m1, m2, (a, b) -> a + " " + b);
|
创建 WebFlux 应用
#
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
|
#
注解式 Controller
@RestController public class UserController { @Autowired private UserRepository userRepository; @GetMapping("/users") public Flux<User> listUsers() { return userRepository.findAll(); } @GetMapping("/users/{id}") public Mono<User> getUser(@PathVariable Long id) { return userRepository.findById(id); } @PostMapping("/users") public Mono<User> createUser(@RequestBody User user) { return userRepository.save(user); } @DeleteMapping("/users/{id}") public Mono<Void> deleteUser(@PathVariable Long id) { return userRepository.deleteById(id); } }
|
#
函数式路由
@Configuration public class UserRouter { @Bean public RouterFunction<ServerResponse> userRoutes(UserHandler handler) { return RouterFunctions.route() .GET("/users", handler::list) .GET("/users/{id}", handler::get) .POST("/users", handler::create) .DELETE("/users/{id}", handler::delete) .build(); } }
@Component public class UserHandler { @Autowired private UserRepository userRepository; public Mono<ServerResponse> list(ServerRequest request) { return ServerResponse.ok() .body(userRepository.findAll(), User.class); } public Mono<ServerResponse> get(ServerRequest request) { Long id = Long.parseLong(request.pathVariable("id")); return userRepository.findById(id) .flatMap(user -> ServerResponse.ok().bodyValue(user)) .switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> create(ServerRequest request) { return request.bodyToMono(User.class) .flatMap(userRepository::save) .flatMap(user -> ServerResponse.ok().bodyValue(user)); } public Mono<ServerResponse> delete(ServerRequest request) { Long id = Long.parseLong(request.pathVariable("id")); return userRepository.deleteById(id) .then(ServerResponse.ok().build()); } }
|
WebClient
#
创建
WebClient client = WebClient.builder() .baseUrl("https://api.example.com") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build();
|
#
GET 请求
Mono<User> user = client.get() .uri("/users/{id}", 1) .retrieve() .bodyToMono(User.class);
Flux<User> users = client.get() .uri("/users") .retrieve() .bodyToFlux(User.class);
|
#
POST 请求
Mono<User> created = client.post() .uri("/users") .bodyValue(newUser) .retrieve() .bodyToMono(User.class);
|
#
错误处理
Mono<User> user = client.get() .uri("/users/{id}", 1) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new NotFoundException())) .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServerException())) .bodyToMono(User.class);
|
#
超时和重试
Mono<User> user = client.get() .uri("/users/{id}", 1) .retrieve() .bodyToMono(User.class) .timeout(Duration.ofSeconds(5)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
|
响应式数据访问
#
R2DBC
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency>
|
public interface UserRepository extends ReactiveCrudRepository<User, Long> { Flux<User> findByAgeGreaterThan(int age); Mono<User> findByUsername(String username); }
|
#
ReactiveMongoRepository
public interface OrderRepository extends ReactiveMongoRepository<Order, String> { Flux<Order> findByStatus(String status); }
|
SSE(Server-Sent Events)
@RestController public class SseController { @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> streamEvents() { return Flux.interval(Duration.ofSeconds(1)) .map(seq -> ServerSentEvent.<String>builder() .id(String.valueOf(seq)) .event("message") .data("Event " + seq) .build()); } }
|
适用场景
| 场景 |
推荐 |
| 高并发网关/代理 |
WebFlux |
| 流式数据处理 |
WebFlux |
| 长连接推送(SSE) |
WebFlux |
| 传统 CRUD |
MVC |
| 复杂业务逻辑 |
MVC |
| 团队熟悉阻塞编程 |
MVC |
总结
WebFlux 适合:
WebFlux 不适合:
- 阻塞 JDBC(用 R2DBC 替代)
- 复杂事务场景
- 团队缺乏响应式经验
响应式编程是未来的趋势,但需要根据实际场景选择合适的技术栈。
核心要点
日志级别设置:根据环境设置合适的级别
日志格式配置:添加 traceId 便于链路追踪
日志输出:控制台输出和文件输出的配置
日志归档:设置滚动策略和保留时间
总结
日志是排查问题的生命线,合理配置日志可以提升排查效率。在实际项目中,结合 ELK 等工具搭建日志系统,可以更好地管理和分析日志。