Callable 与 WebAsyncTask
Callable
@ResponseBody
@GetMapping("/callable")
public Callable<String> helloGetCallable() throws Exception {
System.out.println(Thread.currentThread().getName() + " main thread start");
Callable<String> callable =
() -> {
System.out.println(Thread.currentThread().getName() + " child thread start");
TimeUnit.SECONDS.sleep(25); // 模拟处理业务逻辑,花费了5秒钟
System.out.println(Thread.currentThread().getName() + " child thread end");
// 最终返回的不是Callable对象,而是它里面的内容
return "hello world";
};
System.out.println(Thread.currentThread().getName() + " main thread end");
return callable;
}
/**
http-nio-8080-exec-1 main thread start
http-nio-8080-exec-1 main thread end
task-1 child thread start
task-1 child thread end
*/
WebAsyncTask
Spring 官方推荐如果我们需要超时处理的回调或者错误处理的回调,我们可以使用 WebAsyncTask 代替 Callable。
@ResponseBody
@GetMapping("/async_task")
public WebAsyncTask<String> helloGetWebAsyncTask() throws Exception {
System.out.println(Thread.currentThread().getName() + " main thread start");
Callable<String> callable =
() -> {
System.out.println(Thread.currentThread().getName() + " child thread start");
if (Math.random() < 0.5) {
throw new Exception("Exception Response");
}
TimeUnit.SECONDS.sleep(600); // 模拟处理业务逻辑,话费了5秒钟
System.out.println(Thread.currentThread().getName() + " child thread end");
return "hello world";
};
// 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, callable);
// 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
webAsyncTask.onCompletion(() -> System.out.println("Completion"));
// 这两个返回的内容,最终都会放进response里面去===========
webAsyncTask.onTimeout(() -> "Timeout");
// 备注:这个是Spring5新增的
webAsyncTask.onError(() -> "Exception");
System.out.println(Thread.currentThread().getName() + " main thread end");
return webAsyncTask;
}
WebAsyncTask 的源码如下:
public class WebAsyncTask<V> implements BeanFactoryAware {
// 正常执行的函数(通过WebAsyncTask的构造函数可以传进来)
private final Callable<V> callable;
// 处理超时时间(ms),可通过构造函数指定,也可以不指定(不会有超时处理)
private Long timeout;
// 执行任务的执行器。可以构造函数设置进来,手动指定。
private AsyncTaskExecutor executor;
// 若设置了,会根据此名称去IoC容器里找这个Bean(和上面二选一)
// 若传了executorName,请务必调用set方法设置beanFactory
private String executorName;
private BeanFactory beanFactory;
// 超时的回调
private Callable<V> timeoutCallback;
// 发生错误的回调
private Callable<V> errorCallback;
// 完成的回调(不管超时还是错误都会执行)
private Runnable completionCallback;
...
// 这是获取执行器的逻辑
@Nullable
public AsyncTaskExecutor getExecutor() {
if (this.executor != null) {
return this.executor;
} else if (this.executorName != null) {
Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
} else {
return null;
}
}
public void onTimeout(Callable<V> callback) {
this.timeoutCallback = callback;
}
public void onError(Callable<V> callback) {
this.errorCallback = callback;
}
public void onCompletion(Runnable callback) {
this.completionCallback = callback;
}
// 最终执行超时回调、错误回调、完成回调都是通过这个拦截器实现的
CallableProcessingInterceptor getInterceptor() {
return new CallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
if (completionCallback != null) {
completionCallback.run();
}
}
};
}
}
WebAsyncTask 的异步编程 API,相比于 @Async 注解,WebAsyncTask 提供更加健全的 超时处理 和 异常处理 支持。但是 @Async 也有更优秀的地方,就是他不仅仅能用于 Controller 中,而是可以用在任何地方。
WebMvcConfigurerAdapter
@Configuration
public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
@Override
public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
//处理 callable超时
configurer.setDefaultTimeout(60 * 1000);
configurer.setTaskExecutor(myThreadPoolTaskExecutor);
configurer.registerCallableInterceptors(
timeoutCallableProcessingInterceptor()
);
}
@Bean
public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor();
}
}