SpringWebClient异步HTTP调用

SpringWebClient异步HTTP调用

Spring 5 引入的 WebClient 是新一代 HTTP 客户端,用于替代传统的 RestTemplate,支持响应式编程。

WebClient vs RestTemplate

特性 RestTemplate WebClient
编程模型 同步阻塞 异步非阻塞
底层 Apache HttpClient / JDK Reactor Netty
线程 每请求一线程 事件循环
流式响应 不支持 支持
性能 一般 高并发更优
维护状态 已弃用 推荐使用

创建 WebClient

基础创建

// 方式1:使用工厂
WebClient client = WebClient.create();

// 方式2:指定基础URL
WebClient client = WebClient.create("https://api.example.com");

// 方式3:自定义构建
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + token)
.build();

全局配置

@Configuration
public class WebClientConfig {

@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeaders(headers -> {
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.USER_AGENT, "MyApp/1.0");
})
.filter(logRequest())
.filter(logResponse())
.build();
}

private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(request -> {
System.out.println("Request: " + request.method() + " " + request.url());
return Mono.just(request);
});
}

private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(response -> {
System.out.println("Response: " + response.statusCode());
return Mono.just(response);
});
}
}

GET 请求

@Service
public class UserApiService {

@Autowired
private WebClient webClient;

// 获取单个对象
public Mono<User> getUser(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class);
}

// 获取列表
public Flux<User> listUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
}

// 带查询参数
public Flux<User> searchUsers(String keyword, int page, int size) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/users/search")
.queryParam("keyword", keyword)
.queryParam("page", page)
.queryParam("size", size)
.build())
.retrieve()
.bodyToFlux(User.class);
}

// 获取 String
public Mono<String> getRawData() {
return webClient.get()
.uri("/data")
.retrieve()
.bodyToMono(String.class);
}
}

POST/PUT/DELETE

@Service
public class OrderApiService {

@Autowired
private WebClient webClient;

// POST
public Mono<Order> createOrder(CreateOrderRequest request) {
return webClient.post()
.uri("/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(Order.class);
}

// POST with form data
public Mono<String> submitForm(Map<String, String> formData) {
return webClient.post()
.uri("/form")
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.bodyValue(formData)
.retrieve()
.bodyToMono(String.class);
}

// PUT
public Mono<Order> updateOrder(Long id, UpdateOrderRequest request) {
return webClient.put()
.uri("/orders/{id}", id)
.bodyValue(request)
.retrieve()
.bodyToMono(Order.class);
}

// DELETE
public Mono<Void> deleteOrder(Long id) {
return webClient.delete()
.uri("/orders/{id}", id)
.retrieve()
.bodyToMono(Void.class);
}

// PATCH
public Mono<Order> patchOrder(Long id, Map<String, Object> updates) {
return webClient.patch()
.uri("/orders/{id}", id)
.bodyValue(updates)
.retrieve()
.bodyToMono(Order.class);
}
}

错误处理

@Service
public class ApiService {

public Mono<User> getUserSafe(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
// 4xx 错误处理
.onStatus(HttpStatusCode::is4xxClientError, response -> {
if (response.statusCode() == HttpStatus.NOT_FOUND) {
return Mono.error(new NotFoundException("用户不存在"));
}
return Mono.error(new BadRequestException("请求错误"));
})
// 5xx 错误处理
.onStatus(HttpStatusCode::is5xxServerError, response ->
Mono.error(new ServerException("服务器错误")))
.bodyToMono(User.class)
// 空值处理
.switchIfEmpty(Mono.error(new NotFoundException("用户不存在")))
// 异常恢复
.onErrorResume(NotFoundException.class, e -> {
log.warn("用户不存在: {}", id);
return Mono.just(new User()); // 返回默认值
});
}
}

超时与重试

@Service
public class ResilientApiService {

public Mono<User> getUserWithRetry(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5)) // 超时5秒
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TimeoutException)
.doAfterRetry(retrySignal ->
log.warn("重试第{}次", retrySignal.totalRetries())));
}

// 更复杂的重试策略
public Mono<Order> createOrderWithRetry(CreateOrderRequest request) {
return webClient.post()
.uri("/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(Order.class)
.retryWhen(Retry.fixedDelay(3, Duration.ofMillis(500))
.filter(throwable -> {
// 只对特定错误重试
return throwable instanceof IOException ||
(throwable instanceof WebClientResponseException &&
((WebClientResponseException) throwable).getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE);
})
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new ServiceUnavailableException("服务暂时不可用")));
}
}

并发请求

@Service
public class ParallelApiService {

public Mono<AggregateData> fetchAggregateData(Long userId) {
Mono<User> userMono = getUser(userId);
Mono<List<Order>> ordersMono = getOrders(userId).collectList();
Mono<Wallet> walletMono = getWallet(userId);

return Mono.zip(userMono, ordersMono, walletMono)
.map(tuple -> {
AggregateData data = new AggregateData();
data.setUser(tuple.getT1());
data.setOrders(tuple.getT2());
data.setWallet(tuple.getT3());
return data;
});
}

// 批量获取(并行)
public Flux<User> getUsersParallel(List<Long> ids) {
return Flux.fromIterable(ids)
.flatMap(id -> getUser(id)
.subscribeOn(Schedulers.boundedElastic()), 50); // 并发数50
}
}

文件上传/下载

@Service
public class FileService {

// 上传文件
public Mono<String> uploadFile(FilePart file) {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", file)
.header("Content-Disposition",
"form-data; name=file; filename=" + file.filename());

return webClient.post()
.uri("/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(builder.build())
.retrieve()
.bodyToMono(String.class);
}

// 下载文件
public Mono<byte[]> downloadFile(String fileId) {
return webClient.get()
.uri("/files/{id}", fileId)
.retrieve()
.bodyToMono(byte[].class);
}

// 下载大文件(流式)
public Flux<DataBuffer> downloadLargeFile(String fileId) {
return webClient.get()
.uri("/files/{id}", fileId)
.retrieve()
.bodyToFlux(DataBuffer.class);
}
}

完整配置示例

@Bean
public WebClient webClient(ReactorResourceFactory resourceFactory) {

ReactorClientHttpConnector connector = new ReactorClientHttpConnector(
resourceFactory, client -> client
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(10))
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)))
);

return WebClient.builder()
.clientConnector(connector)
.baseUrl("https://api.example.com")
.defaultHeaders(headers -> {
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
})
.filter(ExchangeFilterFunction.ofRequestProcessor(request -> {
log.debug("Request: {} {}", request.method(), request.url());
return Mono.just(request);
}))
.build();
}

总结

场景 方案
简单 GET webClient.get().uri(...).retrieve().bodyToMono(...)
带参数 uri(builder -> builder.queryParam(...))
POST JSON .bodyValue(object)
错误处理 .onStatus(...) + .onErrorResume(...)
超时 .timeout(Duration)
重试 .retryWhen(Retry...)
并发 Mono.zip(...)Flux.flatMap(...)

WebClient 是 Spring 生态中推荐的 HTTP 客户端,特别适合微服务间的高并发调用场景。


   转载规则


《SpringWebClient异步HTTP调用》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录