快速开始

快速开始

实例化 ExecutorService

实例化 ExecutorService 的方式有两种:一种是工厂方法,另一种是直接创建。

Executors.newFixedThreadPool() 工厂方法创建 ExecutorService 实例

创建 ExecutorService 实例的最简单方法是使用 Executors 类的提供的工厂方法。比如:

ExecutorService executor = Executors.newFixedThreadPool(10);

当然还有其它很多工厂方法,每种工厂方法都可以创建满足特定用例的预定义 ExecutorService 实例。你所需要做的就是找到自己想要的合适的方法。

直接创建 ExecutorService 的实例

因为 ExecutorService 是只是一个接口,因此可以使用其任何实现类的实例。Java java.util.concurrent 包已经预定义了几种实现可供我们选择,或者你也可以创建自己的实现。例如,ThreadPoolExecutor 类实现了 ExecutorService 接口并提供了一些构造函数用于配置执行程序服务及其内部池。

ExecutorService executorService =
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<Runnable>()
);

将任务分配给 ExecutorService

ExecutorService 可以执行 Runnable 和 Callable 任务。

Runnable runnableTask = () -> {
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

创建完了任务之后,就可以使用多种方法将任务分配给 ExecutorService ,比如 execute() 方法,还有 submit()、invokeAny() 和 invokeAll() 等方法。这些方法都继承自 Executor 接口。

  • 首先来看看 execute() 方法。

该方法返回值为空 ( void )。因此使用该方法没有任何可能获得任务执行结果或检查任务的状态( 是正在运行 ( running ) 还是执行完毕 ( executed ))。

executorService.execute(runnableTask);
  • 其次看看 submit() 方法。

submit() 方法会将一个 Callable 或 Runnable 任务提交给 ExecutorService 并返回 Future 类型的结果。

Future<String> future = executorService.submit(callableTask);
  • 然后是 invokeAny() 方法。

invokeAny() 方法将一组任务分配给 ExecutorService,使每个任务执行,并返回任意一个成功执行的任务的结果 ( 如果成功执行 )

String result = executorService.invokeAny(callableTasks);
  • 最后是 invokeAll() 方法。

invokeAll() 方法将一组任务分配给 ExecutorService ,使每个任务执行,并以 Future 类型的对象列表的形式返回所有任务执行的结果。

List<Future<String>> futures = executorService.invokeAll(callableTasks);

关闭 ExecutorService

一般情况下,ExecutorService 并不会自动关闭,即使所有任务都执行完毕,或者没有要处理的任务,也不会自动销毁 ExecutorService 。它会一直出于等待状态,等待我们给它分配新的工作。这种机制,在某些情况下是非常有用的,比如,如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。但另一方面,这也带来了副作用:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的 ExecutorService 将导致 JVM 继续运行。这样,我们就需要主动关闭 ExecutorService。要正确的关闭 ExecutorService,可以调用实例的 shutdown() 或 shutdownNow() 方法。

ExecutorService 接口继承了 Executor 接口,定义了一些生命周期的方法:

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
}
  • shutdown 方法:这个方法会平滑地关闭 ExecutorService,当我们调用这个方法时,ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已 经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。

  • awaitTermination 方法:这个方法有两个参数,一个是 timeout 即超 时时间,另一个是 unit 即时间单位。这个方法会使线程等待 timeout 时长,当超过 timeout 时间后,会监测 ExecutorService 是否已经关闭,若关闭则返回 true,否则返回 false。一般情况下会和 shutdown 方法组合使用。

  • shutdown() 方法:

executorService.shutdown();

shutdown() 方法并不会立即销毁 ExecutorService 实例,而是首先让 ExecutorService 停止接受新任务,并在所有正在运行的线程完成当前工作后关闭。

  • shutdownNow() 方法:
List<Runnable> notExecutedTasks = executorService.shutDownNow();

shutdownNow() 方法会尝试立即销毁 ExecutorService 实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处理的任务列表,由开发人员自行决定如何处理这些任务。

因为提供了两个方法,因此关闭 ExecutorService 实例的最佳实战(也是 Oracle 所推荐的)就是同时使用这两种方法并结合 awaitTermination() 方法。使用这种方式,ExecutorService 首先停止执行新任务,等待指定的时间段完成所有任务。如果该时间到期,则立即停止执行。

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

Future 接口

submit() 方法和 invokeAll() 方法返回一个 Future 接口的对象或 Future 类型的对象集合。这些 Future 接口的对象允许我们获取任务执行的结果或检查任务的状态(是正在运行还是执行完毕)。

Future 接口 get() 方法

Future 接口提供了一个特殊的阻塞方法 get(),它返回 Callable 任务执行的实际结果,但如果是 Runnable 任务,则只会返回 null。因为 get() 方法是阻塞的。如果调用 get() 方法时任务仍在运行,那么调用将会一直被执阻塞,直到任务正确执行完毕并且结果可用时才返回。

而且更重要的是,正在被执行的任务随时都可能抛出异常或中断执行。因此我们要将 get() 调用放在 try catch 语句块中,并捕捉 InterruptedException 或 ExecutionException 异常。

Future<String> future = executorService.submit(callableTask);
String result = null;
try {
    result = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

因为 get() 方法是阻塞的,而且并不知道要阻塞多长时间。因此可能导致应用程序的性能降低。如果结果数据并不重要,那么我们可以使用超时机制来避免长时间阻塞。

String result = future.get(200, TimeUnit.MILLISECONDS);

这个 get() 的重载,第一个参数为超时的时间,第二个参数为时间的单位。上面的实例所表示就的就是等待 200 毫秒。

注意,这个 get() 重载方法,如果在超时时间内正常结束,那么返回的是 Future 类型的结果,如果超时了还没结束,那么将抛出 TimeoutException 异常。

除了 get() 方法之外,Future 还提供了其它很多方法,我们将几个重要的方法罗列在此

方法 说明
isDone() 检查已分配的任务是否已处理
cancel() 取消任务执行
isCancelled() 检查任务是否已取消

这些方法的使用方式如下

boolean isDone      = future.isDone();
boolean canceled    = future.cancel(true);
boolean isCancelled = future.isCancelled();
上一页
下一页