Source for java.util.concurrent.FutureTask

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.concurrent.locks.*;
   9: 
  10: /**
  11:  * A cancellable asynchronous computation.  This class provides a base
  12:  * implementation of {@link Future}, with methods to start and cancel
  13:  * a computation, query to see if the computation is complete, and
  14:  * retrieve the result of the computation.  The result can only be
  15:  * retrieved when the computation has completed; the <tt>get</tt>
  16:  * method will block if the computation has not yet completed.  Once
  17:  * the computation has completed, the computation cannot be restarted
  18:  * or cancelled.
  19:  *
  20:  * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
  21:  * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
  22:  * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
  23:  * submitted to an {@link Executor} for execution.
  24:  *
  25:  * <p>In addition to serving as a standalone class, this class provides
  26:  * <tt>protected</tt> functionality that may be useful when creating
  27:  * customized task classes.
  28:  *
  29:  * @since 1.5
  30:  * @author Doug Lea
  31:  * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
  32:  */
  33: public class FutureTask<V> implements RunnableFuture<V> {
  34:     /** Synchronization control for FutureTask */
  35:     private final Sync sync;
  36: 
  37:     /**
  38:      * Creates a <tt>FutureTask</tt> that will upon running, execute the
  39:      * given <tt>Callable</tt>.
  40:      *
  41:      * @param  callable the callable task
  42:      * @throws NullPointerException if callable is null
  43:      */
  44:     public FutureTask(Callable<V> callable) {
  45:         if (callable == null)
  46:             throw new NullPointerException();
  47:         sync = new Sync(callable);
  48:     }
  49: 
  50:     /**
  51:      * Creates a <tt>FutureTask</tt> that will upon running, execute the
  52:      * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
  53:      * given result on successful completion.
  54:      *
  55:      * @param  runnable the runnable task
  56:      * @param result the result to return on successful completion. If
  57:      * you don't need a particular result, consider using
  58:      * constructions of the form:
  59:      * <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
  60:      * @throws NullPointerException if runnable is null
  61:      */
  62:     public FutureTask(Runnable runnable, V result) {
  63:         sync = new Sync(Executors.callable(runnable, result));
  64:     }
  65: 
  66:     public boolean isCancelled() {
  67:         return sync.innerIsCancelled();
  68:     }
  69: 
  70:     public boolean isDone() {
  71:         return sync.innerIsDone();
  72:     }
  73: 
  74:     public boolean cancel(boolean mayInterruptIfRunning) {
  75:         return sync.innerCancel(mayInterruptIfRunning);
  76:     }
  77: 
  78:     /**
  79:      * @throws CancellationException {@inheritDoc}
  80:      */
  81:     public V get() throws InterruptedException, ExecutionException {
  82:         return sync.innerGet();
  83:     }
  84: 
  85:     /**
  86:      * @throws CancellationException {@inheritDoc}
  87:      */
  88:     public V get(long timeout, TimeUnit unit)
  89:         throws InterruptedException, ExecutionException, TimeoutException {
  90:         return sync.innerGet(unit.toNanos(timeout));
  91:     }
  92: 
  93:     /**
  94:      * Protected method invoked when this task transitions to state
  95:      * <tt>isDone</tt> (whether normally or via cancellation). The
  96:      * default implementation does nothing.  Subclasses may override
  97:      * this method to invoke completion callbacks or perform
  98:      * bookkeeping. Note that you can query status inside the
  99:      * implementation of this method to determine whether this task
 100:      * has been cancelled.
 101:      */
 102:     protected void done() { }
 103: 
 104:     /**
 105:      * Sets the result of this Future to the given value unless
 106:      * this future has already been set or has been cancelled.
 107:      * This method is invoked internally by the <tt>run</tt> method
 108:      * upon successful completion of the computation.
 109:      * @param v the value
 110:      */
 111:     protected void set(V v) {
 112:         sync.innerSet(v);
 113:     }
 114: 
 115:     /**
 116:      * Causes this future to report an <tt>ExecutionException</tt>
 117:      * with the given throwable as its cause, unless this Future has
 118:      * already been set or has been cancelled.
 119:      * This method is invoked internally by the <tt>run</tt> method
 120:      * upon failure of the computation.
 121:      * @param t the cause of failure
 122:      */
 123:     protected void setException(Throwable t) {
 124:         sync.innerSetException(t);
 125:     }
 126: 
 127:     // The following (duplicated) doc comment can be removed once
 128:     //
 129:     // 6270645: Javadoc comments should be inherited from most derived
 130:     //          superinterface or superclass
 131:     // is fixed.
 132:     /**
 133:      * Sets this Future to the result of its computation
 134:      * unless it has been cancelled.
 135:      */
 136:     public void run() {
 137:         sync.innerRun();
 138:     }
 139: 
 140:     /**
 141:      * Executes the computation without setting its result, and then
 142:      * resets this Future to initial state, failing to do so if the
 143:      * computation encounters an exception or is cancelled.  This is
 144:      * designed for use with tasks that intrinsically execute more
 145:      * than once.
 146:      * @return true if successfully run and reset
 147:      */
 148:     protected boolean runAndReset() {
 149:         return sync.innerRunAndReset();
 150:     }
 151: 
 152:     /**
 153:      * Synchronization control for FutureTask. Note that this must be
 154:      * a non-static inner class in order to invoke the protected
 155:      * <tt>done</tt> method. For clarity, all inner class support
 156:      * methods are same as outer, prefixed with "inner".
 157:      *
 158:      * Uses AQS sync state to represent run status
 159:      */
 160:     private final class Sync extends AbstractQueuedSynchronizer {
 161:         private static final long serialVersionUID = -7828117401763700385L;
 162: 
 163:         /** State value representing that task is running */
 164:         private static final int RUNNING   = 1;
 165:         /** State value representing that task ran */
 166:         private static final int RAN       = 2;
 167:         /** State value representing that task was cancelled */
 168:         private static final int CANCELLED = 4;
 169: 
 170:         /** The underlying callable */
 171:         private final Callable<V> callable;
 172:         /** The result to return from get() */
 173:         private V result;
 174:         /** The exception to throw from get() */
 175:         private Throwable exception;
 176: 
 177:         /**
 178:          * The thread running task. When nulled after set/cancel, this
 179:          * indicates that the results are accessible.  Must be
 180:          * volatile, to ensure visibility upon completion.
 181:          */
 182:         private volatile Thread runner;
 183: 
 184:         Sync(Callable<V> callable) {
 185:             this.callable = callable;
 186:         }
 187: 
 188:         private boolean ranOrCancelled(int state) {
 189:             return (state & (RAN | CANCELLED)) != 0;
 190:         }
 191: 
 192:         /**
 193:          * Implements AQS base acquire to succeed if ran or cancelled
 194:          */
 195:         protected int tryAcquireShared(int ignore) {
 196:             return innerIsDone()? 1 : -1;
 197:         }
 198: 
 199:         /**
 200:          * Implements AQS base release to always signal after setting
 201:          * final done status by nulling runner thread.
 202:          */
 203:         protected boolean tryReleaseShared(int ignore) {
 204:             runner = null;
 205:             return true;
 206:         }
 207: 
 208:         boolean innerIsCancelled() {
 209:             return getState() == CANCELLED;
 210:         }
 211: 
 212:         boolean innerIsDone() {
 213:             return ranOrCancelled(getState()) && runner == null;
 214:         }
 215: 
 216:         V innerGet() throws InterruptedException, ExecutionException {
 217:             acquireSharedInterruptibly(0);
 218:             if (getState() == CANCELLED)
 219:                 throw new CancellationException();
 220:             if (exception != null)
 221:                 throw new ExecutionException(exception);
 222:             return result;
 223:         }
 224: 
 225:         V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
 226:             if (!tryAcquireSharedNanos(0, nanosTimeout))
 227:                 throw new TimeoutException();
 228:             if (getState() == CANCELLED)
 229:                 throw new CancellationException();
 230:             if (exception != null)
 231:                 throw new ExecutionException(exception);
 232:             return result;
 233:         }
 234: 
 235:         void innerSet(V v) {
 236:         for (;;) {
 237:         int s = getState();
 238:         if (s == RAN)
 239:             return;
 240:                 if (s == CANCELLED) {
 241:             // aggressively release to set runner to null,
 242:             // in case we are racing with a cancel request
 243:             // that will try to interrupt runner
 244:                     releaseShared(0);
 245:                     return;
 246:                 }
 247:         if (compareAndSetState(s, RAN)) {
 248:                     result = v;
 249:                     releaseShared(0);
 250:                     done();
 251:             return;
 252:                 }
 253:             }
 254:         }
 255: 
 256:         void innerSetException(Throwable t) {
 257:         for (;;) {
 258:         int s = getState();
 259:         if (s == RAN)
 260:             return;
 261:                 if (s == CANCELLED) {
 262:             // aggressively release to set runner to null,
 263:             // in case we are racing with a cancel request
 264:             // that will try to interrupt runner
 265:                     releaseShared(0);
 266:                     return;
 267:                 }
 268:         if (compareAndSetState(s, RAN)) {
 269:                     exception = t;
 270:                     result = null;
 271:                     releaseShared(0);
 272:                     done();
 273:             return;
 274:                 }
 275:         }
 276:         }
 277: 
 278:         boolean innerCancel(boolean mayInterruptIfRunning) {
 279:         for (;;) {
 280:         int s = getState();
 281:         if (ranOrCancelled(s))
 282:             return false;
 283:         if (compareAndSetState(s, CANCELLED))
 284:             break;
 285:         }
 286:             if (mayInterruptIfRunning) {
 287:                 Thread r = runner;
 288:                 if (r != null) 
 289:                     r.interrupt();
 290:             }
 291:             releaseShared(0);
 292:             done();
 293:             return true;
 294:         }
 295: 
 296:         void innerRun() {
 297:             if (!compareAndSetState(0, RUNNING))
 298:                 return;
 299:             try {
 300:                 runner = Thread.currentThread();
 301:                 if (getState() == RUNNING) // recheck after setting thread
 302:                     innerSet(callable.call());
 303:                 else
 304:                     releaseShared(0); // cancel
 305:             } catch (Throwable ex) {
 306:                 innerSetException(ex);
 307:             }
 308:         }
 309: 
 310:         boolean innerRunAndReset() {
 311:             if (!compareAndSetState(0, RUNNING))
 312:                 return false;
 313:             try {
 314:                 runner = Thread.currentThread();
 315:                 if (getState() == RUNNING)
 316:                     callable.call(); // don't set result
 317:                 runner = null;
 318:                 return compareAndSetState(RUNNING, 0);
 319:             } catch (Throwable ex) {
 320:                 innerSetException(ex);
 321:                 return false;
 322:             }
 323:         }
 324:     }
 325: }