任务执行
任务提交
提交给线程池的 task 最好是同一种类的 task,因为不同种类的 task 执行的时间可能会有差异,这样的话如果线程池的大小是固定的,那么就会出现执行时间不均匀的情况。
task 中的 threadlocal 最好不要跨任务使用,因为线程池中的 worker 线程可能会随着任务的数量或者任务执行的异常增加或减少,这样跨任务的 threadlocal 传递就会出现问题。
invokeAll(调用所有的 Callable)
Executors 支持通过 invokeAll()一次批量提交多个 callable。这个方法结果一个 callable 的集合,然后返回一个 future 的列表。
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
invokeAny
批量提交 callable 的另一种方式就是 invokeAny(),它的工作方式与 invokeAll()稍有不同。在等待 future 对象的过程中,这个方法将会阻塞直到第一个 callable 中止然后返回这一个 callable 的结果。为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的 callable。这个方法返回一个 callable,这个 callable 休眠指定 的时间直到返回给定的结果。
//这个callable方法是用来构造不同的Callable对象
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
我们利用这个方法创建一组 callable,这些 callable 拥有不同的执行时间,从 1 分钟到 3 分钟。通过 invokeAny()将这些 callable 提交给一个 executor,返回最快的 callable 的字符串结果-在这个例子中为任务 2:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2
结合 Callable 与 Future 进行异步编程
Executors 支持通过 invokeAll()一次批量提交多个 callable。这个方法结果一个 callable 的集合,然后返回一个 future 的列表。
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
在这个例子中,我们利用 Java8 中的函数流(stream)来处理 invokeAll()调用返回的所有 future。我们首先将每一个 future 映射到它的返回值,然后将每个值打印到控制台。如果你还不属性 stream,可以阅读我的Java8 Stream 教程。
批量提交 callable 的另一种方式就是 invokeAny(),它的工作方式与 invokeAll()稍有不同。在等待 future 对象的过程中,这个方法将会阻塞直到第一个 callable 中止然后返回这一个 callable 的结果。为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的 callable。这个方法返回一个 callable,这个 callable 休眠指定 的时间直到返回给定的结果。
//这个callable方法是用来构造不同的Callable对象
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
我们利用这个方法创建一组 callable,这些 callable 拥有不同的执行时间,从 1 分钟到 3 分钟。通过 invokeAny()将这些 callable 提交给一个 executor,返回最快的 callable 的字符串结果-在这个例子中为任务 2:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2