SpringBoot WebFlux响应式编程

SpringBoot WebFlux响应式编程

Spring Boot 简化了配置,但日志管理依然需要重视。日志配置、链路追踪、排查思路都是日常开发中会遇到的问题。本文讲实际项目中的日志管理经验。

WebFlux vs MVC

特性 Spring MVC Spring WebFlux
编程模型 命令式 声明式/响应式
线程模型 每请求一线程 事件循环 + 少量线程
阻塞 支持阻塞 非阻塞
底层 Servlet API Reactive Streams
适用 传统 Web 高并发/流式数据
Spring MVC:        Spring WebFlux:
Thread-per-Request Event Loop
┌───┐ ┌──────────┐
│Req│──Thread1 │Req1 │──Event Loop
├───┤ │Req2 │ ├──Worker
│Req│──Thread2 │Req3... │ ├──Worker
├───┤ └──────────┘ └──Worker
│Req│──Thread3

核心概念

#

Mono 和 Flux

// Mono: 0 或 1 个元素
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException());

// Flux: 0 到 N 个元素
Flux<String> flux = Flux.just("a", "b", "c");
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

#

订阅

// 订阅并消费
mono.subscribe(
value -> System.out.println("值: " + value),
error -> System.err.println("错误: " + error),
() -> System.out.println("完成")
);

// 阻塞获取
String result = mono.block();

// 转换为 List
List<String> list = flux.collectList().block();

#

操作符

Flux.just(1, 2, 3, 4, 5)
.map(n -> n * n) // 转换: 1, 4, 9, 16, 25
.filter(n -> n > 10) // 过滤: 16, 25
.take(1) // 取前1个: 16
.subscribe(System.out::println);

// 合并
Mono<String> m1 = Mono.just("Hello");
Mono<String> m2 = Mono.just("World");
Mono<String> combined = Mono.zip(m1, m2, (a, b) -> a + " " + b);

创建 WebFlux 应用

#

依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

#

注解式 Controller

@RestController
public class UserController {

@Autowired
private UserRepository userRepository;

@GetMapping("/users")
public Flux<User> listUsers() {
return userRepository.findAll();
}

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}

@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {
return userRepository.save(user);
}

@DeleteMapping("/users/{id}")
public Mono<Void> deleteUser(@PathVariable Long id) {
return userRepository.deleteById(id);
}
}

#

函数式路由

@Configuration
public class UserRouter {

@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.GET("/users", handler::list)
.GET("/users/{id}", handler::get)
.POST("/users", handler::create)
.DELETE("/users/{id}", handler::delete)
.build();
}
}

@Component
public class UserHandler {
@Autowired
private UserRepository userRepository;

public Mono<ServerResponse> list(ServerRequest request) {
return ServerResponse.ok()
.body(userRepository.findAll(), User.class);
}

public Mono<ServerResponse> get(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(User.class)
.flatMap(userRepository::save)
.flatMap(user -> ServerResponse.ok().bodyValue(user));
}

public Mono<ServerResponse> delete(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userRepository.deleteById(id)
.then(ServerResponse.ok().build());
}
}

WebClient

#

创建

WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();

#

GET 请求

// 获取单个对象
Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.bodyToMono(User.class);

// 获取列表
Flux<User> users = client.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);

#

POST 请求

Mono<User> created = client.post()
.uri("/users")
.bodyValue(newUser)
.retrieve()
.bodyToMono(User.class);

#

错误处理

Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
Mono.error(new NotFoundException()))
.onStatus(HttpStatus::is5xxServerError, response ->
Mono.error(new ServerException()))
.bodyToMono(User.class);

#

超时和重试

Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));

响应式数据访问

#

R2DBC

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByAgeGreaterThan(int age);
Mono<User> findByUsername(String username);
}

#

ReactiveMongoRepository

public interface OrderRepository extends ReactiveMongoRepository<Order, String> {
Flux<Order> findByStatus(String status);
}

SSE(Server-Sent Events)

@RestController
public class SseController {

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<String>builder()
.id(String.valueOf(seq))
.event("message")
.data("Event " + seq)
.build());
}
}

适用场景

场景 推荐
高并发网关/代理 WebFlux
流式数据处理 WebFlux
长连接推送(SSE) WebFlux
传统 CRUD MVC
复杂业务逻辑 MVC
团队熟悉阻塞编程 MVC

总结

WebFlux 适合:

  • 高并发、低延迟场景
  • 流式数据推送
  • 微服务网关

WebFlux 不适合:

  • 阻塞 JDBC(用 R2DBC 替代)
  • 复杂事务场景
  • 团队缺乏响应式经验

响应式编程是未来的趋势,但需要根据实际场景选择合适的技术栈。

核心要点

  1. 日志级别设置:根据环境设置合适的级别

  2. 日志格式配置:添加 traceId 便于链路追踪

  3. 日志输出:控制台输出和文件输出的配置

  4. 日志归档:设置滚动策略和保留时间

总结

日志是排查问题的生命线,合理配置日志可以提升排查效率。在实际项目中,结合 ELK 等工具搭建日志系统,可以更好地管理和分析日志。


   转载规则


《SpringBoot WebFlux响应式编程》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录