SpringAsync异步方法
Spring 的 @Async 注解提供了简单的异步方法支持,将方法放入线程池中执行,不阻塞调用方。
启用异步
@Configuration @EnableAsync public class AsyncConfig { }
|
基本使用
@Async 注解
@Service public class EmailService { private static final Logger log = LoggerFactory.getLogger(EmailService.class); @Async public void sendEmail(String to, String subject, String content) { log.info("发送邮件到: {}, 线程: {}", to, Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("邮件发送完成: {}", to); } @Async public Future<String> sendEmailWithResult(String to) { log.info("发送邮件: {}, 线程: {}", to, Thread.currentThread().getName()); return new AsyncResult<>("发送成功: " + to); } @Async public CompletableFuture<String> sendEmailAsync(String to) { log.info("发送邮件: {}, 线程: {}", to, Thread.currentThread().getName()); return CompletableFuture.completedFuture("发送成功: " + to); } }
|
调用
@Service public class OrderService { @Autowired private EmailService emailService; public void createOrder(OrderRequest request) { orderDao.save(request); emailService.sendEmail(request.getEmail(), "订单确认", "..."); System.out.println("订单创建完成"); } public void createOrderWithResult(OrderRequest request) throws Exception { orderDao.save(request); Future<String> future = emailService.sendEmailWithResult(request.getEmail()); String result = future.get(5, TimeUnit.SECONDS); System.out.println(result); } }
|
线程池配置
默认线程池
@Configuration @EnableAsync public class AsyncConfig { @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("async-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
|
指定线程池
@Configuration @EnableAsync public class AsyncConfig { @Bean("emailExecutor") public Executor emailExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(5); executor.setQueueCapacity(50); executor.setThreadNamePrefix("email-"); executor.initialize(); return executor; } @Bean("smsExecutor") public Executor smsExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(5); executor.setQueueCapacity(50); executor.setThreadNamePrefix("sms-"); executor.initialize(); return executor; } }
|
@Service public class NotificationService { @Async("emailExecutor") public void sendEmail(String to) { } @Async("smsExecutor") public void sendSms(String phone) { } }
|
优雅关闭
@Bean(name = "taskExecutor", destroyMethod = "shutdown") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; }
|
注意事项
1. 同类调用不生效
@Service public class OrderService { public void createOrder() { sendEmail(); } @Async public void sendEmail() { } }
|
解决:
@Service public class OrderService { @Autowired private ApplicationContext context; public void createOrder() { OrderService proxy = context.getBean(OrderService.class); proxy.sendEmail(); } @Async public void sendEmail() { } }
|
2. 返回值
@Service public class AsyncService { @Async public CompletableFuture<String> asyncMethod() { return CompletableFuture.completedFuture("result"); } @Async public void fireAndForget() { } }
|
3. 异常处理
@Service public class SafeAsyncService { @Async public CompletableFuture<String> safeAsync() { return CompletableFuture.supplyAsync(() -> { try { return doWork(); } catch (Exception e) { log.error("异步任务异常", e); throw new CompletionException(e); } }); } }
@Configuration public class AsyncExceptionHandler implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { log.error("异步方法异常: {}.{}, 参数: {}", method.getDeclaringClass().getName(), method.getName(), Arrays.toString(params), ex); }; } }
|
4. 事务问题
@Service public class TransactionalService { @Transactional public void createOrder() { orderDao.save(order); sendEmail(); } @Async @Transactional public void sendEmail() { emailLogDao.save(log); } }
|
5. 超时控制
@Service public class TimeoutService { @Async public CompletableFuture<String> withTimeout() { return CompletableFuture.supplyAsync(() -> { return "result"; }).orTimeout(5, TimeUnit.SECONDS) .exceptionally(ex -> { log.error("超时或异常", ex); return "default"; }); } }
|
与 @Transactional 结合
@Service public class OrderService { @Transactional public void createOrder(OrderRequest request) { Order order = orderDao.save(request); notificationService.sendOrderNotification(order); } }
|
推荐做法:事务提交后再异步
@Service public class OrderService { @Autowired private ApplicationEventPublisher eventPublisher; @Transactional public void createOrder(OrderRequest request) { Order order = orderDao.save(request); eventPublisher.publishEvent(new OrderCreatedEvent(order)); } }
@Component public class OrderEventListener { @EventListener @Async @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleOrderCreated(OrderCreatedEvent event) { sendNotification(event.getOrder()); } }
|
总结
| 注意点 |
说明 |
| 同类调用 |
必须通过代理 |
| 返回值 |
用 CompletableFuture |
| 异常 |
配置全局异常处理器 |
| 事务 |
异步方法事务独立 |
| 超时 |
使用 orTimeout |
| 线程池 |
按业务隔离 |
@Async 是简单的异步方案,适合非关键的异步任务。关键业务建议使用消息队列。