Source for java.util.concurrent.ThreadPoolExecutor

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.concurrent.locks.*;
   9: import java.util.*;
  10: 
  11: /**
  12:  * An {@link ExecutorService} that executes each submitted task using
  13:  * one of possibly several pooled threads, normally configured
  14:  * using {@link Executors} factory methods.
  15:  *
  16:  * <p>Thread pools address two different problems: they usually
  17:  * provide improved performance when executing large numbers of
  18:  * asynchronous tasks, due to reduced per-task invocation overhead,
  19:  * and they provide a means of bounding and managing the resources,
  20:  * including threads, consumed when executing a collection of tasks.
  21:  * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
  22:  * statistics, such as the number of completed tasks.
  23:  *
  24:  * <p>To be useful across a wide range of contexts, this class
  25:  * provides many adjustable parameters and extensibility
  26:  * hooks. However, programmers are urged to use the more convenient
  27:  * {@link Executors} factory methods {@link
  28:  * Executors#newCachedThreadPool} (unbounded thread pool, with
  29:  * automatic thread reclamation), {@link Executors#newFixedThreadPool}
  30:  * (fixed size thread pool) and {@link
  31:  * Executors#newSingleThreadExecutor} (single background thread), that
  32:  * preconfigure settings for the most common usage
  33:  * scenarios. Otherwise, use the following guide when manually
  34:  * configuring and tuning this class:
  35:  *
  36:  * <dl>
  37:  *
  38:  * <dt>Core and maximum pool sizes</dt>
  39:  *
  40:  * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
  41:  * pool size
  42:  * (see {@link ThreadPoolExecutor#getPoolSize})
  43:  * according to the bounds set by corePoolSize
  44:  * (see {@link ThreadPoolExecutor#getCorePoolSize})
  45:  * and
  46:  * maximumPoolSize
  47:  * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
  48:  * When a new task is submitted in method {@link
  49:  * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
  50:  * are running, a new thread is created to handle the request, even if
  51:  * other worker threads are idle.  If there are more than
  52:  * corePoolSize but less than maximumPoolSize threads running, a new
  53:  * thread will be created only if the queue is full.  By setting
  54:  * corePoolSize and maximumPoolSize the same, you create a fixed-size
  55:  * thread pool. By setting maximumPoolSize to an essentially unbounded
  56:  * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
  57:  * accommodate an arbitrary number of concurrent tasks. Most typically,
  58:  * core and maximum pool sizes are set only upon construction, but they
  59:  * may also be changed dynamically using {@link
  60:  * ThreadPoolExecutor#setCorePoolSize} and {@link
  61:  * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
  62:  *
  63:  * <dt> On-demand construction
  64:  *
  65:  * <dd> By default, even core threads are initially created and
  66:  * started only when new tasks arrive, but this can be overridden
  67:  * dynamically using method {@link
  68:  * ThreadPoolExecutor#prestartCoreThread} or
  69:  * {@link ThreadPoolExecutor#prestartAllCoreThreads}.
  70:  * You probably want to prestart threads if you construct the
  71:  * pool with a non-empty queue. </dd>
  72:  *
  73:  * <dt>Creating new threads</dt>
  74:  *
  75:  * <dd>New threads are created using a {@link
  76:  * java.util.concurrent.ThreadFactory}.  If not otherwise specified, a
  77:  * {@link Executors#defaultThreadFactory} is used, that creates threads to all
  78:  * be in the same {@link ThreadGroup} and with the same
  79:  * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
  80:  * a different ThreadFactory, you can alter the thread's name, thread
  81:  * group, priority, daemon status, etc. If a <tt>ThreadFactory</tt> fails to create
  82:  * a thread when asked by returning null from <tt>newThread</tt>,
  83:  * the executor will continue, but might
  84:  * not be able to execute any tasks. </dd>
  85:  *
  86:  * <dt>Keep-alive times</dt>
  87:  *
  88:  * <dd>If the pool currently has more than corePoolSize threads,
  89:  * excess threads will be terminated if they have been idle for more
  90:  * than the keepAliveTime (see {@link
  91:  * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
  92:  * reducing resource consumption when the pool is not being actively
  93:  * used. If the pool becomes more active later, new threads will be
  94:  * constructed. This parameter can also be changed dynamically using
  95:  * method {@link ThreadPoolExecutor#setKeepAliveTime}. Using a value
  96:  * of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS} effectively
  97:  * disables idle threads from ever terminating prior to shut down. By
  98:  * default, the keep-alive policy applies only when there are more
  99:  * than corePoolSizeThreads. But method {@link
 100:  * ThreadPoolExecutor#allowCoreThreadTimeOut} can be used to apply
 101:  * this time-out policy to core threads as well, so long as
 102:  * the keepAliveTime value is non-zero. </dd>
 103:  *
 104:  * <dt>Queuing</dt>
 105:  *
 106:  * <dd>Any {@link BlockingQueue} may be used to transfer and hold
 107:  * submitted tasks.  The use of this queue interacts with pool sizing:
 108:  *
 109:  * <ul>
 110:  *
 111:  * <li> If fewer than corePoolSize threads are running, the Executor
 112:  * always prefers adding a new thread
 113:  * rather than queuing.</li>
 114:  *
 115:  * <li> If corePoolSize or more threads are running, the Executor
 116:  * always prefers queuing a request rather than adding a new
 117:  * thread.</li>
 118:  *
 119:  * <li> If a request cannot be queued, a new thread is created unless
 120:  * this would exceed maximumPoolSize, in which case, the task will be
 121:  * rejected.</li>
 122:  *
 123:  * </ul>
 124:  *
 125:  * There are three general strategies for queuing:
 126:  * <ol>
 127:  *
 128:  * <li> <em> Direct handoffs.</em> A good default choice for a work
 129:  * queue is a {@link SynchronousQueue} that hands off tasks to threads
 130:  * without otherwise holding them. Here, an attempt to queue a task
 131:  * will fail if no threads are immediately available to run it, so a
 132:  * new thread will be constructed. This policy avoids lockups when
 133:  * handling sets of requests that might have internal dependencies.
 134:  * Direct handoffs generally require unbounded maximumPoolSizes to
 135:  * avoid rejection of new submitted tasks. This in turn admits the
 136:  * possibility of unbounded thread growth when commands continue to
 137:  * arrive on average faster than they can be processed.  </li>
 138:  *
 139:  * <li><em> Unbounded queues.</em> Using an unbounded queue (for
 140:  * example a {@link LinkedBlockingQueue} without a predefined
 141:  * capacity) will cause new tasks to wait in the queue when all
 142:  * corePoolSize threads are busy. Thus, no more than corePoolSize
 143:  * threads will ever be created. (And the value of the maximumPoolSize
 144:  * therefore doesn't have any effect.)  This may be appropriate when
 145:  * each task is completely independent of others, so tasks cannot
 146:  * affect each others execution; for example, in a web page server.
 147:  * While this style of queuing can be useful in smoothing out
 148:  * transient bursts of requests, it admits the possibility of
 149:  * unbounded work queue growth when commands continue to arrive on
 150:  * average faster than they can be processed.  </li>
 151:  *
 152:  * <li><em>Bounded queues.</em> A bounded queue (for example, an
 153:  * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
 154:  * used with finite maximumPoolSizes, but can be more difficult to
 155:  * tune and control.  Queue sizes and maximum pool sizes may be traded
 156:  * off for each other: Using large queues and small pools minimizes
 157:  * CPU usage, OS resources, and context-switching overhead, but can
 158:  * lead to artificially low throughput.  If tasks frequently block (for
 159:  * example if they are I/O bound), a system may be able to schedule
 160:  * time for more threads than you otherwise allow. Use of small queues
 161:  * generally requires larger pool sizes, which keeps CPUs busier but
 162:  * may encounter unacceptable scheduling overhead, which also
 163:  * decreases throughput.  </li>
 164:  *
 165:  * </ol>
 166:  *
 167:  * </dd>
 168:  *
 169:  * <dt>Rejected tasks</dt>
 170:  *
 171:  * <dd> New tasks submitted in method {@link
 172:  * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
 173:  * Executor has been shut down, and also when the Executor uses finite
 174:  * bounds for both maximum threads and work queue capacity, and is
 175:  * saturated.  In either case, the <tt>execute</tt> method invokes the
 176:  * {@link RejectedExecutionHandler#rejectedExecution} method of its
 177:  * {@link RejectedExecutionHandler}.  Four predefined handler policies
 178:  * are provided:
 179:  *
 180:  * <ol>
 181:  *
 182:  * <li> In the
 183:  * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
 184:  * runtime {@link RejectedExecutionException} upon rejection. </li>
 185:  *
 186:  * <li> In {@link
 187:  * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
 188:  * <tt>execute</tt> itself runs the task. This provides a simple
 189:  * feedback control mechanism that will slow down the rate that new
 190:  * tasks are submitted. </li>
 191:  *
 192:  * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
 193:  * a task that cannot be executed is simply dropped.  </li>
 194:  *
 195:  * <li>In {@link
 196:  * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
 197:  * shut down, the task at the head of the work queue is dropped, and
 198:  * then execution is retried (which can fail again, causing this to be
 199:  * repeated.) </li>
 200:  *
 201:  * </ol>
 202:  *
 203:  * It is possible to define and use other kinds of {@link
 204:  * RejectedExecutionHandler} classes. Doing so requires some care
 205:  * especially when policies are designed to work only under particular
 206:  * capacity or queuing policies. </dd>
 207:  *
 208:  * <dt>Hook methods</dt>
 209:  *
 210:  * <dd>This class provides <tt>protected</tt> overridable {@link
 211:  * ThreadPoolExecutor#beforeExecute} and {@link
 212:  * ThreadPoolExecutor#afterExecute} methods that are called before and
 213:  * after execution of each task.  These can be used to manipulate the
 214:  * execution environment; for example, reinitializing ThreadLocals,
 215:  * gathering statistics, or adding log entries. Additionally, method
 216:  * {@link ThreadPoolExecutor#terminated} can be overridden to perform
 217:  * any special processing that needs to be done once the Executor has
 218:  * fully terminated.
 219:  *
 220:  * <p>If hook or callback methods throw
 221:  * exceptions, internal worker threads may in turn fail and
 222:  * abruptly terminate.</dd>
 223:  *
 224:  * <dt>Queue maintenance</dt>
 225:  *
 226:  * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
 227:  * the work queue for purposes of monitoring and debugging.  Use of
 228:  * this method for any other purpose is strongly discouraged.  Two
 229:  * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
 230:  * ThreadPoolExecutor#purge} are available to assist in storage
 231:  * reclamation when large numbers of queued tasks become
 232:  * cancelled.</dd>
 233:  *
 234:  * <dt>Finalization</dt>
 235:  *
 236:  * <dd> A pool that is no longer referenced in a program <em>AND</em>
 237:  * has no remaining threads will be <tt>shutdown</tt>
 238:  * automatically. If you would like to ensure that unreferenced pools
 239:  * are reclaimed even if users forget to call {@link
 240:  * ThreadPoolExecutor#shutdown}, then you must arrange that unused
 241:  * threads eventually die, by setting appropriate keep-alive times,
 242:  * using a lower bound of zero core threads and/or setting {@link
 243:  * ThreadPoolExecutor#allowCoreThreadTimeOut}.  </dd> </dl>
 244:  *
 245:  * <p> <b>Extension example</b>. Most extensions of this class
 246:  * override one or more of the protected hook methods. For example,
 247:  * here is a subclass that adds a simple pause/resume feature:
 248:  *
 249:  * <pre>
 250:  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 251:  *   private boolean isPaused;
 252:  *   private ReentrantLock pauseLock = new ReentrantLock();
 253:  *   private Condition unpaused = pauseLock.newCondition();
 254:  *
 255:  *   public PausableThreadPoolExecutor(...) { super(...); }
 256:  *
 257:  *   protected void beforeExecute(Thread t, Runnable r) {
 258:  *     super.beforeExecute(t, r);
 259:  *     pauseLock.lock();
 260:  *     try {
 261:  *       while (isPaused) unpaused.await();
 262:  *     } catch (InterruptedException ie) {
 263:  *       t.interrupt();
 264:  *     } finally {
 265:  *       pauseLock.unlock();
 266:  *     }
 267:  *   }
 268:  *
 269:  *   public void pause() {
 270:  *     pauseLock.lock();
 271:  *     try {
 272:  *       isPaused = true;
 273:  *     } finally {
 274:  *       pauseLock.unlock();
 275:  *     }
 276:  *   }
 277:  *
 278:  *   public void resume() {
 279:  *     pauseLock.lock();
 280:  *     try {
 281:  *       isPaused = false;
 282:  *       unpaused.signalAll();
 283:  *     } finally {
 284:  *       pauseLock.unlock();
 285:  *     }
 286:  *   }
 287:  * }
 288:  * </pre>
 289:  * @since 1.5
 290:  * @author Doug Lea
 291:  */
 292: public class ThreadPoolExecutor extends AbstractExecutorService {
 293:     /**
 294:      * Only used to force toArray() to produce a Runnable[].
 295:      */
 296:     private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
 297: 
 298:     /**
 299:      * Permission for checking shutdown
 300:      */
 301:     private static final RuntimePermission shutdownPerm =
 302:         new RuntimePermission("modifyThread");
 303: 
 304:     /**
 305:      * Queue used for holding tasks and handing off to worker threads.
 306:      */
 307:     private final BlockingQueue<Runnable> workQueue;
 308: 
 309:     /**
 310:      * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
 311:      * workers set.
 312:      */
 313:     private final ReentrantLock mainLock = new ReentrantLock();
 314: 
 315:     /**
 316:      * Wait condition to support awaitTermination
 317:      */
 318:     private final Condition termination = mainLock.newCondition();
 319: 
 320:     /**
 321:      * Set containing all worker threads in pool.
 322:      */
 323:     private final HashSet<Worker> workers = new HashSet<Worker>();
 324: 
 325:     /**
 326:      * Timeout in nanoseconds for idle threads waiting for work.
 327:      * Threads use this timeout only when there are more than
 328:      * corePoolSize present. Otherwise they wait forever for new work.
 329:      */
 330:     private volatile long  keepAliveTime;
 331: 
 332:     /**
 333:      * If false (default) core threads stay alive even when idle.
 334:      * If true, core threads use keepAliveTime to time out waiting for work.
 335:      */
 336:     private volatile boolean allowCoreThreadTimeOut;
 337: 
 338:     /**
 339:      * Core pool size, updated only while holding mainLock,
 340:      * but volatile to allow concurrent readability even
 341:      * during updates.
 342:      */
 343:     private volatile int   corePoolSize;
 344: 
 345:     /**
 346:      * Maximum pool size, updated only while holding mainLock
 347:      * but volatile to allow concurrent readability even
 348:      * during updates.
 349:      */
 350:     private volatile int   maximumPoolSize;
 351: 
 352:     /**
 353:      * Current pool size, updated only while holding mainLock
 354:      * but volatile to allow concurrent readability even
 355:      * during updates.
 356:      */
 357:     private volatile int   poolSize;
 358: 
 359:     /**
 360:      * Lifecycle state
 361:      */
 362:     volatile int runState;
 363: 
 364:     // Special values for runState
 365:     /** Normal, not-shutdown mode */
 366:     static final int RUNNING    = 0;
 367:     /** Controlled shutdown mode */
 368:     static final int SHUTDOWN   = 1;
 369:     /** Immediate shutdown mode */
 370:     static final int STOP       = 2;
 371:     /** Final state */
 372:     static final int TERMINATED = 3;
 373: 
 374:     /**
 375:      * Handler called when saturated or shutdown in execute.
 376:      */
 377:     private volatile RejectedExecutionHandler handler;
 378: 
 379:     /**
 380:      * Factory for new threads.
 381:      */
 382:     private volatile ThreadFactory threadFactory;
 383: 
 384:     /**
 385:      * Tracks largest attained pool size.
 386:      */
 387:     private int largestPoolSize;
 388: 
 389:     /**
 390:      * Counter for completed tasks. Updated only on termination of
 391:      * worker threads.
 392:      */
 393:     private long completedTaskCount;
 394: 
 395:     /**
 396:      * The default rejected execution handler
 397:      */
 398:     private static final RejectedExecutionHandler defaultHandler =
 399:         new AbortPolicy();
 400: 
 401:     /**
 402:      * Invokes the rejected execution handler for the given command.
 403:      */
 404:     void reject(Runnable command) {
 405:         handler.rejectedExecution(command, this);
 406:     }
 407: 
 408:     /**
 409:      * Creates and returns a new thread running firstTask as its first
 410:      * task. Call only while holding mainLock.
 411:      * @param firstTask the task the new thread should run first (or
 412:      * null if none)
 413:      * @return the new thread, or null if threadFactory fails to create thread
 414:      */
 415:     private Thread addThread(Runnable firstTask) {
 416:         if (runState == TERMINATED) // Don't create thread if terminated
 417:             return null;
 418:         Worker w = new Worker(firstTask);
 419:         Thread t = threadFactory.newThread(w);
 420:         if (t != null) {
 421:             w.thread = t;
 422:             workers.add(w);
 423:             int nt = ++poolSize;
 424:             if (nt > largestPoolSize)
 425:                 largestPoolSize = nt;
 426:         }
 427:         return t;
 428:     }
 429: 
 430:     /**
 431:      * Creates and starts a new thread running firstTask as its first
 432:      * task, only if fewer than corePoolSize threads are running.
 433:      * @param firstTask the task the new thread should run first (or
 434:      * null if none)
 435:      * @return true if successful.
 436:      */
 437:     private boolean addIfUnderCorePoolSize(Runnable firstTask) {
 438:         Thread t = null;
 439:         final ReentrantLock mainLock = this.mainLock;
 440:         mainLock.lock();
 441:         try {
 442:             if (poolSize < corePoolSize)
 443:                 t = addThread(firstTask);
 444:         } finally {
 445:             mainLock.unlock();
 446:         }
 447:         if (t == null)
 448:             return false;
 449:         t.start();
 450:         return true;
 451:     }
 452: 
 453:     /**
 454:      * Creates and starts a new thread only if fewer than maximumPoolSize
 455:      * threads are running.  The new thread runs as its first task the
 456:      * next task in queue, or if there is none, the given task.
 457:      * @param firstTask the task the new thread should run first (or
 458:      * null if none)
 459:      * @return 0 if a new thread cannot be created, a positive number
 460:      * if firstTask will be run in a new thread, or a negative number
 461:      * if a new thread was created but is running some other task, in
 462:      * which case the caller must try some other way to run firstTask
 463:      * (perhaps by calling this method again).
 464:      */
 465:     private int addIfUnderMaximumPoolSize(Runnable firstTask) {
 466:         Thread t = null;
 467:         int status = 0;
 468:         final ReentrantLock mainLock = this.mainLock;
 469:         mainLock.lock();
 470:         try {
 471:             if (poolSize < maximumPoolSize) {
 472:                 Runnable next = workQueue.poll();
 473:                 if (next == null) {
 474:                     next = firstTask;
 475:                     status = 1;
 476:                 } else
 477:                     status = -1;
 478:                 t = addThread(next);
 479:             }
 480:         } finally {
 481:             mainLock.unlock();
 482:         }
 483:         if (t == null)
 484:             return 0;
 485:         t.start();
 486:         return status;
 487:     }
 488: 
 489: 
 490:     /**
 491:      * Gets the next task for a worker thread to run.
 492:      * @return the task
 493:      */
 494:     Runnable getTask() {
 495:         for (;;) {
 496:             try {
 497:                 switch (runState) {
 498:                 case RUNNING: {
 499:                     // untimed wait if core and not allowing core timeout
 500:                     if (poolSize <= corePoolSize && !allowCoreThreadTimeOut)
 501:                         return workQueue.take();
 502: 
 503:                     long timeout = keepAliveTime;
 504:                     if (timeout <= 0) // die immediately for 0 timeout
 505:                         return null;
 506:                     Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
 507:                     if (r != null)
 508:                         return r;
 509:                     if (poolSize > corePoolSize || allowCoreThreadTimeOut)
 510:                         return null; // timed out
 511:                     // Else, after timeout, the pool shrank. Retry
 512:                     break;
 513:                 }
 514: 
 515:                 case SHUTDOWN: {
 516:                     // Help drain queue
 517:                     Runnable r = workQueue.poll();
 518:                     if (r != null)
 519:                         return r;
 520: 
 521:                     // Check if can terminate
 522:                     if (workQueue.isEmpty()) {
 523:                         interruptIdleWorkers();
 524:                         return null;
 525:                     }
 526: 
 527:                     // Else there could still be delayed tasks in queue.
 528:                     return workQueue.take();
 529:                 }
 530: 
 531:                 case STOP:
 532:                     return null;
 533:                 default:
 534:                     assert false;
 535:                 }
 536:             } catch (InterruptedException ie) {
 537:                 // On interruption, re-check runstate
 538:             }
 539:         }
 540:     }
 541: 
 542:     /**
 543:      * Wakes up all threads that might be waiting for tasks.
 544:      */
 545:     void interruptIdleWorkers() {
 546:         final ReentrantLock mainLock = this.mainLock;
 547:         mainLock.lock();
 548:         try {
 549:             for (Worker w : workers)
 550:                 w.interruptIfIdle();
 551:         } finally {
 552:             mainLock.unlock();
 553:         }
 554:     }
 555: 
 556:     /**
 557:      * Performs bookkeeping for a terminated worker thread.
 558:      * @param w the worker
 559:      */
 560:     void workerDone(Worker w) {
 561:         final ReentrantLock mainLock = this.mainLock;
 562:         mainLock.lock();
 563:         try {
 564:             completedTaskCount += w.completedTasks;
 565:             workers.remove(w);
 566:             if (--poolSize > 0)
 567:                 return;
 568: 
 569:             // Else, this is the last thread. Deal with potential shutdown.
 570: 
 571:             int state = runState;
 572:             assert state != TERMINATED;
 573: 
 574:             if (state != STOP) {
 575:                 // If there are queued tasks but no threads, create
 576:                 // replacement thread. We must create it initially
 577:                 // idle to avoid orphaned tasks in case addThread
 578:                 // fails.  This also handles case of delayed tasks
 579:                 // that will sometime later become runnable.
 580:                 if (!workQueue.isEmpty()) {
 581:                     Thread t = addThread(null);
 582:                     if (t != null)
 583:                         t.start();
 584:                     return;
 585:                 }
 586: 
 587:                 // Otherwise, we can exit without replacement
 588:                 if (state == RUNNING)
 589:                     return;
 590:             }
 591: 
 592:             // Either state is STOP, or state is SHUTDOWN and there is
 593:             // no work to do. So we can terminate.
 594:             termination.signalAll();
 595:             runState = TERMINATED;
 596:             // fall through to call terminate() outside of lock.
 597:         } finally {
 598:             mainLock.unlock();
 599:         }
 600: 
 601:         assert runState == TERMINATED;
 602:         terminated();
 603:     }
 604: 
 605:     /**
 606:      *  Worker threads
 607:      */
 608:     private class Worker implements Runnable {
 609: 
 610:         /**
 611:          * The runLock is acquired and released surrounding each task
 612:          * execution. It mainly protects against interrupts that are
 613:          * intended to cancel the worker thread from instead
 614:          * interrupting the task being run.
 615:          */
 616:         private final ReentrantLock runLock = new ReentrantLock();
 617: 
 618:         /**
 619:          * Initial task to run before entering run loop
 620:          */
 621:         private Runnable firstTask;
 622: 
 623:         /**
 624:          * Per thread completed task counter; accumulated
 625:          * into completedTaskCount upon termination.
 626:          */
 627:         volatile long completedTasks;
 628: 
 629:         /**
 630:          * Thread this worker is running in.  Acts as a final field,
 631:          * but cannot be set until thread is created.
 632:          */
 633:         Thread thread;
 634: 
 635:         Worker(Runnable firstTask) {
 636:             this.firstTask = firstTask;
 637:         }
 638: 
 639:         boolean isActive() {
 640:             return runLock.isLocked();
 641:         }
 642: 
 643:         /**
 644:          * Interrupts thread if not running a task.
 645:          */
 646:         void interruptIfIdle() {
 647:             final ReentrantLock runLock = this.runLock;
 648:             if (runLock.tryLock()) {
 649:                 try {
 650:                     thread.interrupt();
 651:                 } finally {
 652:                     runLock.unlock();
 653:                 }
 654:             }
 655:         }
 656: 
 657:         /**
 658:          * Interrupts thread even if running a task.
 659:          */
 660:         void interruptNow() {
 661:             thread.interrupt();
 662:         }
 663: 
 664:         /**
 665:          * Runs a single task between before/after methods.
 666:          */
 667:         private void runTask(Runnable task) {
 668:             final ReentrantLock runLock = this.runLock;
 669:             runLock.lock();
 670:             try {
 671:                 // If not shutting down then clear an outstanding interrupt.
 672:                 if (runState != STOP && 
 673:                     Thread.interrupted() && 
 674:                     runState == STOP) // Re-interrupt if stopped after clearing
 675:                     thread.interrupt();
 676:                 boolean ran = false;
 677:                 beforeExecute(thread, task);
 678:                 try {
 679:                     task.run();
 680:                     ran = true;
 681:                     afterExecute(task, null);
 682:                     ++completedTasks;
 683:                 } catch (RuntimeException ex) {
 684:                     if (!ran)
 685:                         afterExecute(task, ex);
 686:                     // Else the exception occurred within
 687:                     // afterExecute itself in which case we don't
 688:                     // want to call it again.
 689:                     throw ex;
 690:                 }
 691:             } finally {
 692:                 runLock.unlock();
 693:             }
 694:         }
 695: 
 696:         /**
 697:          * Main run loop
 698:          */
 699:         public void run() {
 700:             try {
 701:                 Runnable task = firstTask;
 702:                 firstTask = null;
 703:                 while (task != null || (task = getTask()) != null) {
 704:                     runTask(task);
 705:                     task = null; // unnecessary but can help GC
 706:                 }
 707:             } finally {
 708:                 workerDone(this);
 709:             }
 710:         }
 711:     }
 712: 
 713:     // Public methods
 714: 
 715:     /**
 716:      * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
 717:      * parameters and default thread factory and rejected execution handler.
 718:      * It may be more convenient to use one of the {@link Executors} factory
 719:      * methods instead of this general purpose constructor.
 720:      *
 721:      * @param corePoolSize the number of threads to keep in the
 722:      * pool, even if they are idle.
 723:      * @param maximumPoolSize the maximum number of threads to allow in the
 724:      * pool.
 725:      * @param keepAliveTime when the number of threads is greater than
 726:      * the core, this is the maximum time that excess idle threads
 727:      * will wait for new tasks before terminating.
 728:      * @param unit the time unit for the keepAliveTime
 729:      * argument.
 730:      * @param workQueue the queue to use for holding tasks before they
 731:      * are executed. This queue will hold only the <tt>Runnable</tt>
 732:      * tasks submitted by the <tt>execute</tt> method.
 733:      * @throws IllegalArgumentException if corePoolSize, or
 734:      * keepAliveTime less than zero, or if maximumPoolSize less than or
 735:      * equal to zero, or if corePoolSize greater than maximumPoolSize.
 736:      * @throws NullPointerException if <tt>workQueue</tt> is null
 737:      */
 738:     public ThreadPoolExecutor(int corePoolSize,
 739:                               int maximumPoolSize,
 740:                               long keepAliveTime,
 741:                               TimeUnit unit,
 742:                               BlockingQueue<Runnable> workQueue) {
 743:         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 744:              Executors.defaultThreadFactory(), defaultHandler);
 745:     }
 746: 
 747:     /**
 748:      * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
 749:      * parameters and default rejected execution handler.
 750:      *
 751:      * @param corePoolSize the number of threads to keep in the
 752:      * pool, even if they are idle.
 753:      * @param maximumPoolSize the maximum number of threads to allow in the
 754:      * pool.
 755:      * @param keepAliveTime when the number of threads is greater than
 756:      * the core, this is the maximum time that excess idle threads
 757:      * will wait for new tasks before terminating.
 758:      * @param unit the time unit for the keepAliveTime
 759:      * argument.
 760:      * @param workQueue the queue to use for holding tasks before they
 761:      * are executed. This queue will hold only the <tt>Runnable</tt>
 762:      * tasks submitted by the <tt>execute</tt> method.
 763:      * @param threadFactory the factory to use when the executor
 764:      * creates a new thread.
 765:      * @throws IllegalArgumentException if corePoolSize, or
 766:      * keepAliveTime less than zero, or if maximumPoolSize less than or
 767:      * equal to zero, or if corePoolSize greater than maximumPoolSize.
 768:      * @throws NullPointerException if <tt>workQueue</tt>
 769:      * or <tt>threadFactory</tt> are null.
 770:      */
 771:     public ThreadPoolExecutor(int corePoolSize,
 772:                               int maximumPoolSize,
 773:                               long keepAliveTime,
 774:                               TimeUnit unit,
 775:                               BlockingQueue<Runnable> workQueue,
 776:                               ThreadFactory threadFactory) {
 777:         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 778:              threadFactory, defaultHandler);
 779:     }
 780: 
 781:     /**
 782:      * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
 783:      * parameters and default thread factory.
 784:      *
 785:      * @param corePoolSize the number of threads to keep in the
 786:      * pool, even if they are idle.
 787:      * @param maximumPoolSize the maximum number of threads to allow in the
 788:      * pool.
 789:      * @param keepAliveTime when the number of threads is greater than
 790:      * the core, this is the maximum time that excess idle threads
 791:      * will wait for new tasks before terminating.
 792:      * @param unit the time unit for the keepAliveTime
 793:      * argument.
 794:      * @param workQueue the queue to use for holding tasks before they
 795:      * are executed. This queue will hold only the <tt>Runnable</tt>
 796:      * tasks submitted by the <tt>execute</tt> method.
 797:      * @param handler the handler to use when execution is blocked
 798:      * because the thread bounds and queue capacities are reached.
 799:      * @throws IllegalArgumentException if corePoolSize, or
 800:      * keepAliveTime less than zero, or if maximumPoolSize less than or
 801:      * equal to zero, or if corePoolSize greater than maximumPoolSize.
 802:      * @throws NullPointerException if <tt>workQueue</tt>
 803:      * or <tt>handler</tt> are null.
 804:      */
 805:     public ThreadPoolExecutor(int corePoolSize,
 806:                               int maximumPoolSize,
 807:                               long keepAliveTime,
 808:                               TimeUnit unit,
 809:                               BlockingQueue<Runnable> workQueue,
 810:                               RejectedExecutionHandler handler) {
 811:         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 812:              Executors.defaultThreadFactory(), handler);
 813:     }
 814: 
 815:     /**
 816:      * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
 817:      * parameters.
 818:      *
 819:      * @param corePoolSize the number of threads to keep in the
 820:      * pool, even if they are idle.
 821:      * @param maximumPoolSize the maximum number of threads to allow in the
 822:      * pool.
 823:      * @param keepAliveTime when the number of threads is greater than
 824:      * the core, this is the maximum time that excess idle threads
 825:      * will wait for new tasks before terminating.
 826:      * @param unit the time unit for the keepAliveTime
 827:      * argument.
 828:      * @param workQueue the queue to use for holding tasks before they
 829:      * are executed. This queue will hold only the <tt>Runnable</tt>
 830:      * tasks submitted by the <tt>execute</tt> method.
 831:      * @param threadFactory the factory to use when the executor
 832:      * creates a new thread.
 833:      * @param handler the handler to use when execution is blocked
 834:      * because the thread bounds and queue capacities are reached.
 835:      * @throws IllegalArgumentException if corePoolSize, or
 836:      * keepAliveTime less than zero, or if maximumPoolSize less than or
 837:      * equal to zero, or if corePoolSize greater than maximumPoolSize.
 838:      * @throws NullPointerException if <tt>workQueue</tt>
 839:      * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
 840:      */
 841:     public ThreadPoolExecutor(int corePoolSize,
 842:                               int maximumPoolSize,
 843:                               long keepAliveTime,
 844:                               TimeUnit unit,
 845:                               BlockingQueue<Runnable> workQueue,
 846:                               ThreadFactory threadFactory,
 847:                               RejectedExecutionHandler handler) {
 848:         if (corePoolSize < 0 ||
 849:             maximumPoolSize <= 0 ||
 850:             maximumPoolSize < corePoolSize ||
 851:             keepAliveTime < 0)
 852:             throw new IllegalArgumentException();
 853:         if (workQueue == null || threadFactory == null || handler == null)
 854:             throw new NullPointerException();
 855:         this.corePoolSize = corePoolSize;
 856:         this.maximumPoolSize = maximumPoolSize;
 857:         this.workQueue = workQueue;
 858:         this.keepAliveTime = unit.toNanos(keepAliveTime);
 859:         this.threadFactory = threadFactory;
 860:         this.handler = handler;
 861:     }
 862: 
 863: 
 864:     /**
 865:      * Executes the given task sometime in the future.  The task
 866:      * may execute in a new thread or in an existing pooled thread.
 867:      *
 868:      * If the task cannot be submitted for execution, either because this
 869:      * executor has been shutdown or because its capacity has been reached,
 870:      * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
 871:      *
 872:      * @param command the task to execute
 873:      * @throws RejectedExecutionException at discretion of
 874:      * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
 875:      * for execution
 876:      * @throws NullPointerException if command is null
 877:      */
 878:     public void execute(Runnable command) {
 879:         if (command == null)
 880:             throw new NullPointerException();
 881:         for (;;) {
 882:             if (runState != RUNNING) {
 883:                 reject(command);
 884:                 return;
 885:             }
 886:             if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
 887:                 return;
 888:             if (workQueue.offer(command))
 889:                 return;
 890:             int status = addIfUnderMaximumPoolSize(command);
 891:             if (status > 0)      // created new thread
 892:                 return;
 893:             if (status == 0) {   // failed to create thread
 894:                 reject(command);
 895:                 return;
 896:             }
 897:             // Retry if created a new thread but it is busy with another task
 898:         }
 899:     }
 900: 
 901:     /**
 902:      * Initiates an orderly shutdown in which previously submitted
 903:      * tasks are executed, but no new tasks will be
 904:      * accepted. Invocation has no additional effect if already shut
 905:      * down.
 906:      * @throws SecurityException if a security manager exists and
 907:      * shutting down this ExecutorService may manipulate threads that
 908:      * the caller is not permitted to modify because it does not hold
 909:      * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
 910:      * or the security manager's <tt>checkAccess</tt> method denies access.
 911:      */
 912:     public void shutdown() {
 913:         // Fail if caller doesn't have modifyThread permission.
 914:     SecurityManager security = System.getSecurityManager();
 915:     if (security != null)
 916:             security.checkPermission(shutdownPerm);
 917: 
 918:         boolean fullyTerminated = false;
 919:         final ReentrantLock mainLock = this.mainLock;
 920:         mainLock.lock();
 921:         try {
 922:             if (workers.size() > 0) {
 923:                 // Check if caller can modify worker threads.  This
 924:                 // might not be true even if passed above check, if
 925:                 // the SecurityManager treats some threads specially.
 926:                 if (security != null) {
 927:                     for (Worker w: workers)
 928:                         security.checkAccess(w.thread);
 929:                 }
 930: 
 931:                 int state = runState;
 932:                 if (state == RUNNING) // don't override shutdownNow
 933:                     runState = SHUTDOWN;
 934: 
 935:                 try {
 936:                     for (Worker w: workers)
 937:                         w.interruptIfIdle();
 938:                 } catch (SecurityException se) {
 939:                     // If SecurityManager allows above checks, but
 940:                     // then unexpectedly throws exception when
 941:                     // interrupting threads (which it ought not do),
 942:                     // back out as cleanly as we can. Some threads may
 943:                     // have been killed but we remain in non-shutdown
 944:                     // state.
 945:                     runState = state;
 946:                     throw se;
 947:                 }
 948:             }
 949:             else { // If no workers, trigger full termination now
 950:                 fullyTerminated = true;
 951:                 runState = TERMINATED;
 952:                 termination.signalAll();
 953:             }
 954:         } finally {
 955:             mainLock.unlock();
 956:         }
 957:         if (fullyTerminated)
 958:             terminated();
 959:     }
 960: 
 961: 
 962:     /**
 963:      * Attempts to stop all actively executing tasks, halts the
 964:      * processing of waiting tasks, and returns a list of the tasks
 965:      * that were awaiting execution.
 966:      *
 967:      * <p>There are no guarantees beyond best-effort attempts to stop
 968:      * processing actively executing tasks.  This implementation
 969:      * cancels tasks via {@link Thread#interrupt}, so any task that
 970:      * fails to respond to interrupts may never terminate.
 971:      *
 972:      * @return list of tasks that never commenced execution
 973:      * @throws SecurityException if a security manager exists and
 974:      * shutting down this ExecutorService may manipulate threads that
 975:      * the caller is not permitted to modify because it does not hold
 976:      * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
 977:      * or the security manager's <tt>checkAccess</tt> method denies access.
 978:      */
 979:     public List<Runnable> shutdownNow() {
 980:         // Almost the same code as shutdown()
 981:     SecurityManager security = System.getSecurityManager();
 982:     if (security != null)
 983:             security.checkPermission(shutdownPerm);
 984: 
 985:         boolean fullyTerminated = false;
 986:         final ReentrantLock mainLock = this.mainLock;
 987:         mainLock.lock();
 988:         try {
 989:             if (workers.size() > 0) {
 990:                 if (security != null) {
 991:                     for (Worker w: workers)
 992:                         security.checkAccess(w.thread);
 993:                 }
 994: 
 995:                 int state = runState;
 996:                 if (state != TERMINATED)
 997:                     runState = STOP;
 998:                 try {
 999:                     for (Worker w : workers)
1000:                         w.interruptNow();
1001:                 } catch (SecurityException se) {
1002:                     runState = state; // back out;
1003:                     throw se;
1004:                 }
1005:             }
1006:             else { // If no workers, trigger full termination now
1007:                 fullyTerminated = true;
1008:                 runState = TERMINATED;
1009:                 termination.signalAll();
1010:             }
1011:         } finally {
1012:             mainLock.unlock();
1013:         }
1014:         if (fullyTerminated)
1015:             terminated();
1016:         return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
1017:     }
1018: 
1019:     public boolean isShutdown() {
1020:         return runState != RUNNING;
1021:     }
1022: 
1023:     /**
1024:      * Returns true if this executor is in the process of terminating
1025:      * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
1026:      * completely terminated.  This method may be useful for
1027:      * debugging. A return of <tt>true</tt> reported a sufficient
1028:      * period after shutdown may indicate that submitted tasks have
1029:      * ignored or suppressed interruption, causing this executor not
1030:      * to properly terminate.
1031:      * @return true if terminating but not yet terminated.
1032:      */
1033:     public boolean isTerminating() {
1034:         return runState == STOP;
1035:     }
1036: 
1037:     public boolean isTerminated() {
1038:         return runState == TERMINATED;
1039:     }
1040: 
1041:     public boolean awaitTermination(long timeout, TimeUnit unit)
1042:         throws InterruptedException {
1043:         long nanos = unit.toNanos(timeout);
1044:         final ReentrantLock mainLock = this.mainLock;
1045:         mainLock.lock();
1046:         try {
1047:             for (;;) {
1048:                 if (runState == TERMINATED)
1049:                     return true;
1050:                 if (nanos <= 0)
1051:                     return false;
1052:                 nanos = termination.awaitNanos(nanos);
1053:             }
1054:         } finally {
1055:             mainLock.unlock();
1056:         }
1057:     }
1058: 
1059:     /**
1060:      * Invokes <tt>shutdown</tt> when this executor is no longer
1061:      * referenced.
1062:      */
1063:     protected void finalize()  {
1064:         shutdown();
1065:     }
1066: 
1067:     /**
1068:      * Sets the thread factory used to create new threads.
1069:      *
1070:      * @param threadFactory the new thread factory
1071:      * @throws NullPointerException if threadFactory is null
1072:      * @see #getThreadFactory
1073:      */
1074:     public void setThreadFactory(ThreadFactory threadFactory) {
1075:         if (threadFactory == null)
1076:             throw new NullPointerException();
1077:         this.threadFactory = threadFactory;
1078:     }
1079: 
1080:     /**
1081:      * Returns the thread factory used to create new threads.
1082:      *
1083:      * @return the current thread factory
1084:      * @see #setThreadFactory
1085:      */
1086:     public ThreadFactory getThreadFactory() {
1087:         return threadFactory;
1088:     }
1089: 
1090:     /**
1091:      * Sets a new handler for unexecutable tasks.
1092:      *
1093:      * @param handler the new handler
1094:      * @throws NullPointerException if handler is null
1095:      * @see #getRejectedExecutionHandler
1096:      */
1097:     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
1098:         if (handler == null)
1099:             throw new NullPointerException();
1100:         this.handler = handler;
1101:     }
1102: 
1103:     /**
1104:      * Returns the current handler for unexecutable tasks.
1105:      *
1106:      * @return the current handler
1107:      * @see #setRejectedExecutionHandler
1108:      */
1109:     public RejectedExecutionHandler getRejectedExecutionHandler() {
1110:         return handler;
1111:     }
1112: 
1113:     /**
1114:      * Returns the task queue used by this executor. Access to the
1115:      * task queue is intended primarily for debugging and monitoring.
1116:      * This queue may be in active use.  Retrieving the task queue
1117:      * does not prevent queued tasks from executing.
1118:      *
1119:      * @return the task queue
1120:      */
1121:     public BlockingQueue<Runnable> getQueue() {
1122:         return workQueue;
1123:     }
1124: 
1125:     /**
1126:      * Removes this task from the executor's internal queue if it is
1127:      * present, thus causing it not to be run if it has not already
1128:      * started.
1129:      *
1130:      * <p> This method may be useful as one part of a cancellation
1131:      * scheme.  It may fail to remove tasks that have been converted
1132:      * into other forms before being placed on the internal queue. For
1133:      * example, a task entered using <tt>submit</tt> might be
1134:      * converted into a form that maintains <tt>Future</tt> status.
1135:      * However, in such cases, method {@link ThreadPoolExecutor#purge}
1136:      * may be used to remove those Futures that have been cancelled.
1137:      *
1138:      * @param task the task to remove
1139:      * @return true if the task was removed
1140:      */
1141:     public boolean remove(Runnable task) {
1142:         return getQueue().remove(task);
1143:     }
1144: 
1145: 
1146:     /**
1147:      * Tries to remove from the work queue all {@link Future}
1148:      * tasks that have been cancelled. This method can be useful as a
1149:      * storage reclamation operation, that has no other impact on
1150:      * functionality. Cancelled tasks are never executed, but may
1151:      * accumulate in work queues until worker threads can actively
1152:      * remove them. Invoking this method instead tries to remove them now.
1153:      * However, this method may fail to remove tasks in
1154:      * the presence of interference by other threads.
1155:      */
1156:     public void purge() {
1157:         // Fail if we encounter interference during traversal
1158:         try {
1159:             Iterator<Runnable> it = getQueue().iterator();
1160:             while (it.hasNext()) {
1161:                 Runnable r = it.next();
1162:                 if (r instanceof Future<?>) {
1163:                     Future<?> c = (Future<?>)r;
1164:                     if (c.isCancelled())
1165:                         it.remove();
1166:                 }
1167:             }
1168:         }
1169:         catch (ConcurrentModificationException ex) {
1170:             return;
1171:         }
1172:     }
1173: 
1174:     /**
1175:      * Sets the core number of threads.  This overrides any value set
1176:      * in the constructor.  If the new value is smaller than the
1177:      * current value, excess existing threads will be terminated when
1178:      * they next become idle. If larger, new threads will, if needed,
1179:      * be started to execute any queued tasks.
1180:      *
1181:      * @param corePoolSize the new core size
1182:      * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1183:      * less than zero
1184:      * @see #getCorePoolSize
1185:      */
1186:     public void setCorePoolSize(int corePoolSize) {
1187:         if (corePoolSize < 0)
1188:             throw new IllegalArgumentException();
1189:         final ReentrantLock mainLock = this.mainLock;
1190:         mainLock.lock();
1191:         try {
1192:             int extra = this.corePoolSize - corePoolSize;
1193:             this.corePoolSize = corePoolSize;
1194:             if (extra < 0) {
1195:                 int n = workQueue.size();
1196:                 // We have to create initially-idle threads here
1197:                 // because we otherwise have no recourse about
1198:                 // what to do with a dequeued task if addThread fails.
1199:                 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
1200:                     Thread t = addThread(null);
1201:                     if (t != null)
1202:                         t.start();
1203:                     else
1204:                         break;
1205:                 }
1206:             }
1207:             else if (extra > 0 && poolSize > corePoolSize) {
1208:                 Iterator<Worker> it = workers.iterator();
1209:                 while (it.hasNext() &&
1210:                        extra-- > 0 &&
1211:                        poolSize > corePoolSize &&
1212:                        workQueue.remainingCapacity() == 0)
1213:                     it.next().interruptIfIdle();
1214:             }
1215:         } finally {
1216:             mainLock.unlock();
1217:         }
1218:     }
1219: 
1220:     /**
1221:      * Returns the core number of threads.
1222:      *
1223:      * @return the core number of threads
1224:      * @see #setCorePoolSize
1225:      */
1226:     public int getCorePoolSize() {
1227:         return corePoolSize;
1228:     }
1229: 
1230:     /**
1231:      * Starts a core thread, causing it to idly wait for work. This
1232:      * overrides the default policy of starting core threads only when
1233:      * new tasks are executed. This method will return <tt>false</tt>
1234:      * if all core threads have already been started.
1235:      * @return true if a thread was started
1236:      */
1237:     public boolean prestartCoreThread() {
1238:         return addIfUnderCorePoolSize(null);
1239:     }
1240: 
1241:     /**
1242:      * Starts all core threads, causing them to idly wait for work. This
1243:      * overrides the default policy of starting core threads only when
1244:      * new tasks are executed.
1245:      * @return the number of threads started.
1246:      */
1247:     public int prestartAllCoreThreads() {
1248:         int n = 0;
1249:         while (addIfUnderCorePoolSize(null))
1250:             ++n;
1251:         return n;
1252:     }
1253: 
1254:     /**
1255:      * Returns true if this pool allows core threads to time out and
1256:      * terminate if no tasks arrive within the keepAlive time, being
1257:      * replaced if needed when new tasks arrive. When true, the same
1258:      * keep-alive policy applying to non-core threads applies also to
1259:      * core threads. When false (the default), core threads are never
1260:      * terminated due to lack of incoming tasks.
1261:      * @return <tt>true</tt> if core threads are allowed to time out,
1262:      * else <tt>false</tt>
1263:      *
1264:      * @since 1.6
1265:      */
1266:     public boolean allowsCoreThreadTimeOut() {
1267:         return allowCoreThreadTimeOut;
1268:     }
1269: 
1270:     /**
1271:      * Sets the policy governing whether core threads may time out and
1272:      * terminate if no tasks arrive within the keep-alive time, being
1273:      * replaced if needed when new tasks arrive. When false, core
1274:      * threads are never terminated due to lack of incoming
1275:      * tasks. When true, the same keep-alive policy applying to
1276:      * non-core threads applies also to core threads. To avoid
1277:      * continual thread replacement, the keep-alive time must be
1278:      * greater than zero when setting <tt>true</tt>. This method
1279:      * should in general be called before the pool is actively used.
1280:      * @param value <tt>true</tt> if should time out, else <tt>false</tt>
1281:      * @throws IllegalArgumentException if value is <tt>true</tt>
1282:      * and the current keep-alive time is not greater than zero.
1283:      *
1284:      * @since 1.6
1285:      */
1286:     public void allowCoreThreadTimeOut(boolean value) {
1287:         if (value && keepAliveTime <= 0)
1288:             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1289: 
1290:         allowCoreThreadTimeOut = value;
1291:     }
1292: 
1293:     /**
1294:      * Sets the maximum allowed number of threads. This overrides any
1295:      * value set in the constructor. If the new value is smaller than
1296:      * the current value, excess existing threads will be
1297:      * terminated when they next become idle.
1298:      *
1299:      * @param maximumPoolSize the new maximum
1300:      * @throws IllegalArgumentException if the new maximum is
1301:      *         less than or equal to zero, or
1302:      *         less than the {@linkplain #getCorePoolSize core pool size}
1303:      * @see #getMaximumPoolSize
1304:      */
1305:     public void setMaximumPoolSize(int maximumPoolSize) {
1306:         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1307:             throw new IllegalArgumentException();
1308:         final ReentrantLock mainLock = this.mainLock;
1309:         mainLock.lock();
1310:         try {
1311:             int extra = this.maximumPoolSize - maximumPoolSize;
1312:             this.maximumPoolSize = maximumPoolSize;
1313:             if (extra > 0 && poolSize > maximumPoolSize) {
1314:                 Iterator<Worker> it = workers.iterator();
1315:                 while (it.hasNext() &&
1316:                        extra > 0 &&
1317:                        poolSize > maximumPoolSize) {
1318:                     it.next().interruptIfIdle();
1319:                     --extra;
1320:                 }
1321:             }
1322:         } finally {
1323:             mainLock.unlock();
1324:         }
1325:     }
1326: 
1327:     /**
1328:      * Returns the maximum allowed number of threads.
1329:      *
1330:      * @return the maximum allowed number of threads
1331:      * @see #setMaximumPoolSize
1332:      */
1333:     public int getMaximumPoolSize() {
1334:         return maximumPoolSize;
1335:     }
1336: 
1337:     /**
1338:      * Sets the time limit for which threads may remain idle before
1339:      * being terminated.  If there are more than the core number of
1340:      * threads currently in the pool, after waiting this amount of
1341:      * time without processing a task, excess threads will be
1342:      * terminated.  This overrides any value set in the constructor.
1343:      * @param time the time to wait.  A time value of zero will cause
1344:      * excess threads to terminate immediately after executing tasks.
1345:      * @param unit  the time unit of the time argument
1346:      * @throws IllegalArgumentException if time less than zero or
1347:      * if time is zero and allowsCoreThreadTimeOut
1348:      * @see #getKeepAliveTime
1349:      */
1350:     public void setKeepAliveTime(long time, TimeUnit unit) {
1351:         if (time < 0)
1352:             throw new IllegalArgumentException();
1353:         if (time == 0 && allowsCoreThreadTimeOut())
1354:             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1355:         this.keepAliveTime = unit.toNanos(time);
1356:     }
1357: 
1358:     /**
1359:      * Returns the thread keep-alive time, which is the amount of time
1360:      * which threads in excess of the core pool size may remain
1361:      * idle before being terminated.
1362:      *
1363:      * @param unit the desired time unit of the result
1364:      * @return the time limit
1365:      * @see #setKeepAliveTime
1366:      */
1367:     public long getKeepAliveTime(TimeUnit unit) {
1368:         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1369:     }
1370: 
1371:     /* Statistics */
1372: 
1373:     /**
1374:      * Returns the current number of threads in the pool.
1375:      *
1376:      * @return the number of threads
1377:      */
1378:     public int getPoolSize() {
1379:         return poolSize;
1380:     }
1381: 
1382:     /**
1383:      * Returns the approximate number of threads that are actively
1384:      * executing tasks.
1385:      *
1386:      * @return the number of threads
1387:      */
1388:     public int getActiveCount() {
1389:         final ReentrantLock mainLock = this.mainLock;
1390:         mainLock.lock();
1391:         try {
1392:             int n = 0;
1393:             for (Worker w : workers) {
1394:                 if (w.isActive())
1395:                     ++n;
1396:             }
1397:             return n;
1398:         } finally {
1399:             mainLock.unlock();
1400:         }
1401:     }
1402: 
1403:     /**
1404:      * Returns the largest number of threads that have ever
1405:      * simultaneously been in the pool.
1406:      *
1407:      * @return the number of threads
1408:      */
1409:     public int getLargestPoolSize() {
1410:         final ReentrantLock mainLock = this.mainLock;
1411:         mainLock.lock();
1412:         try {
1413:             return largestPoolSize;
1414:         } finally {
1415:             mainLock.unlock();
1416:         }
1417:     }
1418: 
1419:     /**
1420:      * Returns the approximate total number of tasks that have been
1421:      * scheduled for execution. Because the states of tasks and
1422:      * threads may change dynamically during computation, the returned
1423:      * value is only an approximation, but one that does not ever
1424:      * decrease across successive calls.
1425:      *
1426:      * @return the number of tasks
1427:      */
1428:     public long getTaskCount() {
1429:         final ReentrantLock mainLock = this.mainLock;
1430:         mainLock.lock();
1431:         try {
1432:             long n = completedTaskCount;
1433:             for (Worker w : workers) {
1434:                 n += w.completedTasks;
1435:                 if (w.isActive())
1436:                     ++n;
1437:             }
1438:             return n + workQueue.size();
1439:         } finally {
1440:             mainLock.unlock();
1441:         }
1442:     }
1443: 
1444:     /**
1445:      * Returns the approximate total number of tasks that have
1446:      * completed execution. Because the states of tasks and threads
1447:      * may change dynamically during computation, the returned value
1448:      * is only an approximation, but one that does not ever decrease
1449:      * across successive calls.
1450:      *
1451:      * @return the number of tasks
1452:      */
1453:     public long getCompletedTaskCount() {
1454:         final ReentrantLock mainLock = this.mainLock;
1455:         mainLock.lock();
1456:         try {
1457:             long n = completedTaskCount;
1458:             for (Worker w : workers)
1459:                 n += w.completedTasks;
1460:             return n;
1461:         } finally {
1462:             mainLock.unlock();
1463:         }
1464:     }
1465: 
1466:     /**
1467:      * Method invoked prior to executing the given Runnable in the
1468:      * given thread.  This method is invoked by thread <tt>t</tt> that
1469:      * will execute task <tt>r</tt>, and may be used to re-initialize
1470:      * ThreadLocals, or to perform logging.
1471:      *
1472:      * <p>This implementation does nothing, but may be customized in
1473:      * subclasses. Note: To properly nest multiple overridings, subclasses
1474:      * should generally invoke <tt>super.beforeExecute</tt> at the end of
1475:      * this method.
1476:      *
1477:      * @param t the thread that will run task r.
1478:      * @param r the task that will be executed.
1479:      */
1480:     protected void beforeExecute(Thread t, Runnable r) { }
1481: 
1482:     /**
1483:      * Method invoked upon completion of execution of the given Runnable.
1484:      * This method is invoked by the thread that executed the task. If
1485:      * non-null, the Throwable is the uncaught <tt>RuntimeException</tt>
1486:      * or <tt>Error</tt> that caused execution to terminate abruptly.
1487:      *
1488:      * <p><b>Note:</b> When actions are enclosed in tasks (such as
1489:      * {@link FutureTask}) either explicitly or via methods such as
1490:      * <tt>submit</tt>, these task objects catch and maintain
1491:      * computational exceptions, and so they do not cause abrupt
1492:      * termination, and the internal exceptions are <em>not</em>
1493:      * passed to this method.
1494:      *
1495:      * <p>This implementation does nothing, but may be customized in
1496:      * subclasses. Note: To properly nest multiple overridings, subclasses
1497:      * should generally invoke <tt>super.afterExecute</tt> at the
1498:      * beginning of this method.
1499:      *
1500:      * @param r the runnable that has completed.
1501:      * @param t the exception that caused termination, or null if
1502:      * execution completed normally.
1503:      */
1504:     protected void afterExecute(Runnable r, Throwable t) { }
1505: 
1506:     /**
1507:      * Method invoked when the Executor has terminated.  Default
1508:      * implementation does nothing. Note: To properly nest multiple
1509:      * overridings, subclasses should generally invoke
1510:      * <tt>super.terminated</tt> within this method.
1511:      */
1512:     protected void terminated() { }
1513: 
1514:     /**
1515:      * A handler for rejected tasks that runs the rejected task
1516:      * directly in the calling thread of the <tt>execute</tt> method,
1517:      * unless the executor has been shut down, in which case the task
1518:      * is discarded.
1519:      */
1520:     public static class CallerRunsPolicy implements RejectedExecutionHandler {
1521:         /**
1522:          * Creates a <tt>CallerRunsPolicy</tt>.
1523:          */
1524:         public CallerRunsPolicy() { }
1525: 
1526:         /**
1527:          * Executes task r in the caller's thread, unless the executor
1528:          * has been shut down, in which case the task is discarded.
1529:          * @param r the runnable task requested to be executed
1530:          * @param e the executor attempting to execute this task
1531:          */
1532:         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1533:             if (!e.isShutdown()) {
1534:                 r.run();
1535:             }
1536:         }
1537:     }
1538: 
1539:     /**
1540:      * A handler for rejected tasks that throws a
1541:      * <tt>RejectedExecutionException</tt>.
1542:      */
1543:     public static class AbortPolicy implements RejectedExecutionHandler {
1544:         /**
1545:          * Creates an <tt>AbortPolicy</tt>.
1546:          */
1547:         public AbortPolicy() { }
1548: 
1549:         /**
1550:          * Always throws RejectedExecutionException.
1551:          * @param r the runnable task requested to be executed
1552:          * @param e the executor attempting to execute this task
1553:          * @throws RejectedExecutionException always.
1554:          */
1555:         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1556:             throw new RejectedExecutionException();
1557:         }
1558:     }
1559: 
1560:     /**
1561:      * A handler for rejected tasks that silently discards the
1562:      * rejected task.
1563:      */
1564:     public static class DiscardPolicy implements RejectedExecutionHandler {
1565:         /**
1566:          * Creates a <tt>DiscardPolicy</tt>.
1567:          */
1568:         public DiscardPolicy() { }
1569: 
1570:         /**
1571:          * Does nothing, which has the effect of discarding task r.
1572:          * @param r the runnable task requested to be executed
1573:          * @param e the executor attempting to execute this task
1574:          */
1575:         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1576:         }
1577:     }
1578: 
1579:     /**
1580:      * A handler for rejected tasks that discards the oldest unhandled
1581:      * request and then retries <tt>execute</tt>, unless the executor
1582:      * is shut down, in which case the task is discarded.
1583:      */
1584:     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1585:         /**
1586:          * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1587:          */
1588:         public DiscardOldestPolicy() { }
1589: 
1590:         /**
1591:          * Obtains and ignores the next task that the executor
1592:          * would otherwise execute, if one is immediately available,
1593:          * and then retries execution of task r, unless the executor
1594:          * is shut down, in which case task r is instead discarded.
1595:          * @param r the runnable task requested to be executed
1596:          * @param e the executor attempting to execute this task
1597:          */
1598:         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1599:             if (!e.isShutdown()) {
1600:                 e.getQueue().poll();
1601:                 e.execute(r);
1602:             }
1603:         }
1604:     }
1605: }