任务执行

任务提交

提交给线程池的 task 最好是同一种类的 task,因为不同种类的 task 执行的时间可能会有差异,这样的话如果线程池的大小是固定的,那么就会出现执行时间不均匀的情况。

task 中的 threadlocal 最好不要跨任务使用,因为线程池中的 worker 线程可能会随着任务的数量或者任务执行的异常增加或减少,这样跨任务的 threadlocal 传递就会出现问题。

invokeAll(调用所有的 Callable)

Executors 支持通过 invokeAll()一次批量提交多个 callable。这个方法结果一个 callable 的集合,然后返回一个 future 的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

invokeAny

批量提交 callable 的另一种方式就是 invokeAny(),它的工作方式与 invokeAll()稍有不同。在等待 future 对象的过程中,这个方法将会阻塞直到第一个 callable 中止然后返回这一个 callable 的结果。为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的 callable。这个方法返回一个 callable,这个 callable 休眠指定 的时间直到返回给定的结果。

//这个callable方法是用来构造不同的Callable对象
Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我们利用这个方法创建一组 callable,这些 callable 拥有不同的执行时间,从 1 分钟到 3 分钟。通过 invokeAny()将这些 callable 提交给一个 executor,返回最快的 callable 的字符串结果-在这个例子中为任务 2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

结合 Callable 与 Future 进行异步编程

Executors 支持通过 invokeAll()一次批量提交多个 callable。这个方法结果一个 callable 的集合,然后返回一个 future 的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在这个例子中,我们利用 Java8 中的函数流(stream)来处理 invokeAll()调用返回的所有 future。我们首先将每一个 future 映射到它的返回值,然后将每个值打印到控制台。如果你还不属性 stream,可以阅读我的Java8 Stream 教程

批量提交 callable 的另一种方式就是 invokeAny(),它的工作方式与 invokeAll()稍有不同。在等待 future 对象的过程中,这个方法将会阻塞直到第一个 callable 中止然后返回这一个 callable 的结果。为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的 callable。这个方法返回一个 callable,这个 callable 休眠指定 的时间直到返回给定的结果。

//这个callable方法是用来构造不同的Callable对象
Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我们利用这个方法创建一组 callable,这些 callable 拥有不同的执行时间,从 1 分钟到 3 分钟。通过 invokeAny()将这些 callable 提交给一个 executor,返回最快的 callable 的字符串结果-在这个例子中为任务 2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2
上一页