Source for java.util.concurrent.ExecutorCompletionService

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: 
   9: /**
  10:  * A {@link CompletionService} that uses a supplied {@link Executor}
  11:  * to execute tasks.  This class arranges that submitted tasks are,
  12:  * upon completion, placed on a queue accessible using <tt>take</tt>.
  13:  * The class is lightweight enough to be suitable for transient use
  14:  * when processing groups of tasks.
  15:  *
  16:  * <p>
  17:  *
  18:  * <b>Usage Examples.</b>
  19:  *
  20:  * Suppose you have a set of solvers for a certain problem, each
  21:  * returning a value of some type <tt>Result</tt>, and would like to
  22:  * run them concurrently, processing the results of each of them that
  23:  * return a non-null value, in some method <tt>use(Result r)</tt>. You
  24:  * could write this as:
  25:  *
  26:  * <pre>
  27:  *   void solve(Executor e,
  28:  *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
  29:  *     throws InterruptedException, ExecutionException {
  30:  *       CompletionService&lt;Result&gt; ecs
  31:  *           = new ExecutorCompletionService&lt;Result&gt;(e);
  32:  *       for (Callable&lt;Result&gt; s : solvers)
  33:  *           ecs.submit(s);
  34:  *       int n = solvers.size();
  35:  *       for (int i = 0; i &lt; n; ++i) {
  36:  *           Result r = ecs.take().get();
  37:  *           if (r != null)
  38:  *               use(r);
  39:  *       }
  40:  *   }
  41:  * </pre>
  42:  *
  43:  * Suppose instead that you would like to use the first non-null result
  44:  * of the set of tasks, ignoring any that encounter exceptions,
  45:  * and cancelling all other tasks when the first one is ready:
  46:  *
  47:  * <pre>
  48:  *   void solve(Executor e,
  49:  *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
  50:  *     throws InterruptedException {
  51:  *       CompletionService&lt;Result&gt; ecs
  52:  *           = new ExecutorCompletionService&lt;Result&gt;(e);
  53:  *       int n = solvers.size();
  54:  *       List&lt;Future&lt;Result&gt;&gt; futures
  55:  *           = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
  56:  *       Result result = null;
  57:  *       try {
  58:  *           for (Callable&lt;Result&gt; s : solvers)
  59:  *               futures.add(ecs.submit(s));
  60:  *           for (int i = 0; i &lt; n; ++i) {
  61:  *               try {
  62:  *                   Result r = ecs.take().get();
  63:  *                   if (r != null) {
  64:  *                       result = r;
  65:  *                       break;
  66:  *                   }
  67:  *               } catch (ExecutionException ignore) {}
  68:  *           }
  69:  *       }
  70:  *       finally {
  71:  *           for (Future&lt;Result&gt; f : futures)
  72:  *               f.cancel(true);
  73:  *       }
  74:  *
  75:  *       if (result != null)
  76:  *           use(result);
  77:  *   }
  78:  * </pre>
  79:  */
  80: public class ExecutorCompletionService<V> implements CompletionService<V> {
  81:     private final Executor executor;
  82:     private final AbstractExecutorService aes;
  83:     private final BlockingQueue<Future<V>> completionQueue;
  84: 
  85:     /**
  86:      * FutureTask extension to enqueue upon completion
  87:      */
  88:     private class QueueingFuture extends FutureTask<Void> {
  89:         QueueingFuture(RunnableFuture<V> task) {
  90:             super(task, null);
  91:             this.task = task;
  92:         }
  93:         protected void done() { completionQueue.add(task); }
  94:         private final Future<V> task;
  95:     }
  96: 
  97:     private RunnableFuture<V> newTaskFor(Callable<V> task) {
  98:         if (aes == null)
  99:             return new FutureTask<V>(task);
 100:         else
 101:             return aes.newTaskFor(task);
 102:     }
 103: 
 104:     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
 105:         if (aes == null)
 106:             return new FutureTask<V>(task, result);
 107:         else
 108:             return aes.newTaskFor(task, result);
 109:     }
 110: 
 111:     /**
 112:      * Creates an ExecutorCompletionService using the supplied
 113:      * executor for base task execution and a
 114:      * {@link LinkedBlockingQueue} as a completion queue.
 115:      *
 116:      * @param executor the executor to use
 117:      * @throws NullPointerException if executor is <tt>null</tt>
 118:      */
 119:     public ExecutorCompletionService(Executor executor) {
 120:         if (executor == null)
 121:             throw new NullPointerException();
 122:         this.executor = executor;
 123:         this.aes = (executor instanceof AbstractExecutorService) ?
 124:             (AbstractExecutorService) executor : null;
 125:         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
 126:     }
 127: 
 128:     /**
 129:      * Creates an ExecutorCompletionService using the supplied
 130:      * executor for base task execution and the supplied queue as its
 131:      * completion queue.
 132:      *
 133:      * @param executor the executor to use
 134:      * @param completionQueue the queue to use as the completion queue
 135:      * normally one dedicated for use by this service
 136:      * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
 137:      */
 138:     public ExecutorCompletionService(Executor executor,
 139:                                      BlockingQueue<Future<V>> completionQueue) {
 140:         if (executor == null || completionQueue == null)
 141:             throw new NullPointerException();
 142:         this.executor = executor;
 143:         this.aes = (executor instanceof AbstractExecutorService) ?
 144:             (AbstractExecutorService) executor : null;
 145:         this.completionQueue = completionQueue;
 146:     }
 147: 
 148:     public Future<V> submit(Callable<V> task) {
 149:         if (task == null) throw new NullPointerException();
 150:         RunnableFuture<V> f = newTaskFor(task);
 151:         executor.execute(new QueueingFuture(f));
 152:         return f;
 153:     }
 154: 
 155:     public Future<V> submit(Runnable task, V result) {
 156:         if (task == null) throw new NullPointerException();
 157:         RunnableFuture<V> f = newTaskFor(task, result);
 158:         executor.execute(new QueueingFuture(f));
 159:         return f;
 160:     }
 161: 
 162:     public Future<V> take() throws InterruptedException {
 163:         return completionQueue.take();
 164:     }
 165: 
 166:     public Future<V> poll() {
 167:         return completionQueue.poll();
 168:     }
 169: 
 170:     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
 171:         return completionQueue.poll(timeout, unit);
 172:     }
 173: 
 174: }