ExecutorService中界说了两个批量执行任务的要领,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中很是利便。invokeAll()在所有任务都完成(包罗乐成/被间断/超时)后才会返回,invokeAny()在任意一个任务乐成(或ExecutorService被间断/超时)后就会返回。
AbstractExecutorService实现了这两个要领,本文将先后阐明invokeAll()和invokeAny()两个要领的源码实现。
invokeAll()
invokeAll()在所有任务都完成(包罗乐成/被间断/超时)后才会返回。有不限时和限时版本,从更简朴的不限时版入手。
不限时版
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); // 无所谓先执行哪个任务的get()要领 } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
8-12行,先将所有任务都提交到线程池(虽然,任何ExecutorService均可)中。
严格来说,不是“提交”,而是“执行”。执行大概是同步或异步的,取决于线程池的计策。不外由于我们仅接头异步环境(同步同理),用“提交”一词更容易领略。下同。
13-22行,for轮回的目标是阻塞挪用invokeAll的线程,直到所有任务都执行完毕。虽然我们也可以利用其他方法实现阻塞,不外这种方法是最简朴的:
最后,为防备在全部任务竣事之前过早退出,23行、25-29行相共同,假如done不为true(未执行到40行就退出了)则打消全部任务。
限时版
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); final long deadline = System.nanoTime() + nanos; final int size = futures.size(); // Interleave time checks and calls to execute in case // executor doesn't have any/much parallelism. for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); if (nanos <= 0L) // 实时查抄是否超时 return futures; } for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { if (nanos <= 0L) // 实时查抄是否超时 return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } nanos = deadline - System.nanoTime(); } } done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
10-11行,先将所有任务封装为FutureTask,添加到futures列表中。
18-23行,每提交一个任务,就立即判定是否超时。这样的话,假如在任务全部提交到线程池中之前,昆山软件开发,就已经到达了超时时间,昆山软件开发,则可以或许尽快查抄出超时,竣事提交并退出。