SpringWebClient异步HTTP调用
Spring 5 引入的 WebClient 是新一代 HTTP 客户端,用于替代传统的 RestTemplate,支持响应式编程。
WebClient vs RestTemplate
| 特性 |
RestTemplate |
WebClient |
| 编程模型 |
同步阻塞 |
异步非阻塞 |
| 底层 |
Apache HttpClient / JDK |
Reactor Netty |
| 线程 |
每请求一线程 |
事件循环 |
| 流式响应 |
不支持 |
支持 |
| 性能 |
一般 |
高并发更优 |
| 维护状态 |
已弃用 |
推荐使用 |
创建 WebClient
基础创建
WebClient client = WebClient.create();
WebClient client = WebClient.create("https://api.example.com");
WebClient client = WebClient.builder() .baseUrl("https://api.example.com") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + token) .build();
|
全局配置
@Configuration public class WebClientConfig { @Bean public WebClient webClient() { return WebClient.builder() .baseUrl("https://api.example.com") .defaultHeaders(headers -> { headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); headers.add(HttpHeaders.USER_AGENT, "MyApp/1.0"); }) .filter(logRequest()) .filter(logResponse()) .build(); } private ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(request -> { System.out.println("Request: " + request.method() + " " + request.url()); return Mono.just(request); }); } private ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(response -> { System.out.println("Response: " + response.statusCode()); return Mono.just(response); }); } }
|
GET 请求
@Service public class UserApiService { @Autowired private WebClient webClient; public Mono<User> getUser(Long id) { return webClient.get() .uri("/users/{id}", id) .retrieve() .bodyToMono(User.class); } public Flux<User> listUsers() { return webClient.get() .uri("/users") .retrieve() .bodyToFlux(User.class); } public Flux<User> searchUsers(String keyword, int page, int size) { return webClient.get() .uri(uriBuilder -> uriBuilder .path("/users/search") .queryParam("keyword", keyword) .queryParam("page", page) .queryParam("size", size) .build()) .retrieve() .bodyToFlux(User.class); } public Mono<String> getRawData() { return webClient.get() .uri("/data") .retrieve() .bodyToMono(String.class); } }
|
POST/PUT/DELETE
@Service public class OrderApiService { @Autowired private WebClient webClient; public Mono<Order> createOrder(CreateOrderRequest request) { return webClient.post() .uri("/orders") .bodyValue(request) .retrieve() .bodyToMono(Order.class); } public Mono<String> submitForm(Map<String, String> formData) { return webClient.post() .uri("/form") .contentType(MediaType.APPLICATION_FORM_URLENCODED) .bodyValue(formData) .retrieve() .bodyToMono(String.class); } public Mono<Order> updateOrder(Long id, UpdateOrderRequest request) { return webClient.put() .uri("/orders/{id}", id) .bodyValue(request) .retrieve() .bodyToMono(Order.class); } public Mono<Void> deleteOrder(Long id) { return webClient.delete() .uri("/orders/{id}", id) .retrieve() .bodyToMono(Void.class); } public Mono<Order> patchOrder(Long id, Map<String, Object> updates) { return webClient.patch() .uri("/orders/{id}", id) .bodyValue(updates) .retrieve() .bodyToMono(Order.class); } }
|
错误处理
@Service public class ApiService { public Mono<User> getUserSafe(Long id) { return webClient.get() .uri("/users/{id}", id) .retrieve() .onStatus(HttpStatusCode::is4xxClientError, response -> { if (response.statusCode() == HttpStatus.NOT_FOUND) { return Mono.error(new NotFoundException("用户不存在")); } return Mono.error(new BadRequestException("请求错误")); }) .onStatus(HttpStatusCode::is5xxServerError, response -> Mono.error(new ServerException("服务器错误"))) .bodyToMono(User.class) .switchIfEmpty(Mono.error(new NotFoundException("用户不存在"))) .onErrorResume(NotFoundException.class, e -> { log.warn("用户不存在: {}", id); return Mono.just(new User()); }); } }
|
超时与重试
@Service public class ResilientApiService { public Mono<User> getUserWithRetry(Long id) { return webClient.get() .uri("/users/{id}", id) .retrieve() .bodyToMono(User.class) .timeout(Duration.ofSeconds(5)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(throwable -> throwable instanceof TimeoutException) .doAfterRetry(retrySignal -> log.warn("重试第{}次", retrySignal.totalRetries()))); } public Mono<Order> createOrderWithRetry(CreateOrderRequest request) { return webClient.post() .uri("/orders") .bodyValue(request) .retrieve() .bodyToMono(Order.class) .retryWhen(Retry.fixedDelay(3, Duration.ofMillis(500)) .filter(throwable -> { return throwable instanceof IOException || (throwable instanceof WebClientResponseException && ((WebClientResponseException) throwable).getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE); }) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new ServiceUnavailableException("服务暂时不可用"))); } }
|
并发请求
@Service public class ParallelApiService { public Mono<AggregateData> fetchAggregateData(Long userId) { Mono<User> userMono = getUser(userId); Mono<List<Order>> ordersMono = getOrders(userId).collectList(); Mono<Wallet> walletMono = getWallet(userId); return Mono.zip(userMono, ordersMono, walletMono) .map(tuple -> { AggregateData data = new AggregateData(); data.setUser(tuple.getT1()); data.setOrders(tuple.getT2()); data.setWallet(tuple.getT3()); return data; }); } public Flux<User> getUsersParallel(List<Long> ids) { return Flux.fromIterable(ids) .flatMap(id -> getUser(id) .subscribeOn(Schedulers.boundedElastic()), 50); } }
|
文件上传/下载
@Service public class FileService { public Mono<String> uploadFile(FilePart file) { MultipartBodyBuilder builder = new MultipartBodyBuilder(); builder.part("file", file) .header("Content-Disposition", "form-data; name=file; filename=" + file.filename()); return webClient.post() .uri("/upload") .contentType(MediaType.MULTIPART_FORM_DATA) .bodyValue(builder.build()) .retrieve() .bodyToMono(String.class); } public Mono<byte[]> downloadFile(String fileId) { return webClient.get() .uri("/files/{id}", fileId) .retrieve() .bodyToMono(byte[].class); } public Flux<DataBuffer> downloadLargeFile(String fileId) { return webClient.get() .uri("/files/{id}", fileId) .retrieve() .bodyToFlux(DataBuffer.class); } }
|
完整配置示例
@Bean public WebClient webClient(ReactorResourceFactory resourceFactory) { ReactorClientHttpConnector connector = new ReactorClientHttpConnector( resourceFactory, client -> client .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .responseTimeout(Duration.ofSeconds(10)) .doOnConnected(conn -> conn .addHandlerLast(new ReadTimeoutHandler(10)) .addHandlerLast(new WriteTimeoutHandler(10))) ); return WebClient.builder() .clientConnector(connector) .baseUrl("https://api.example.com") .defaultHeaders(headers -> { headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE); }) .filter(ExchangeFilterFunction.ofRequestProcessor(request -> { log.debug("Request: {} {}", request.method(), request.url()); return Mono.just(request); })) .build(); }
|
总结
| 场景 |
方案 |
| 简单 GET |
webClient.get().uri(...).retrieve().bodyToMono(...) |
| 带参数 |
uri(builder -> builder.queryParam(...)) |
| POST JSON |
.bodyValue(object) |
| 错误处理 |
.onStatus(...) + .onErrorResume(...) |
| 超时 |
.timeout(Duration) |
| 重试 |
.retryWhen(Retry...) |
| 并发 |
Mono.zip(...) 或 Flux.flatMap(...) |
WebClient 是 Spring 生态中推荐的 HTTP 客户端,特别适合微服务间的高并发调用场景。