Source for java.util.concurrent.locks.AbstractQueuedSynchronizer

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent.locks;
   8: import java.util.*;
   9: import java.util.concurrent.*;
  10: import java.util.concurrent.atomic.*;
  11: import sun.misc.Unsafe;
  12: 
  13: /**
  14:  * Provides a framework for implementing blocking locks and related
  15:  * synchronizers (semaphores, events, etc) that rely on
  16:  * first-in-first-out (FIFO) wait queues.  This class is designed to
  17:  * be a useful basis for most kinds of synchronizers that rely on a
  18:  * single atomic <tt>int</tt> value to represent state. Subclasses
  19:  * must define the protected methods that change this state, and which
  20:  * define what that state means in terms of this object being acquired
  21:  * or released.  Given these, the other methods in this class carry
  22:  * out all queuing and blocking mechanics. Subclasses can maintain
  23:  * other state fields, but only the atomically updated <tt>int</tt>
  24:  * value manipulated using methods {@link #getState}, {@link
  25:  * #setState} and {@link #compareAndSetState} is tracked with respect
  26:  * to synchronization.
  27:  *
  28:  * <p>Subclasses should be defined as non-public internal helper
  29:  * classes that are used to implement the synchronization properties
  30:  * of their enclosing class.  Class
  31:  * <tt>AbstractQueuedSynchronizer</tt> does not implement any
  32:  * synchronization interface.  Instead it defines methods such as
  33:  * {@link #acquireInterruptibly} that can be invoked as
  34:  * appropriate by concrete locks and related synchronizers to
  35:  * implement their public methods.
  36:  *
  37:  * <p>This class supports either or both a default <em>exclusive</em>
  38:  * mode and a <em>shared</em> mode. When acquired in exclusive mode,
  39:  * attempted acquires by other threads cannot succeed. Shared mode
  40:  * acquires by multiple threads may (but need not) succeed. This class
  41:  * does not &quot;understand&quot; these differences except in the
  42:  * mechanical sense that when a shared mode acquire succeeds, the next
  43:  * waiting thread (if one exists) must also determine whether it can
  44:  * acquire as well. Threads waiting in the different modes share the
  45:  * same FIFO queue. Usually, implementation subclasses support only
  46:  * one of these modes, but both can come into play for example in a
  47:  * {@link ReadWriteLock}. Subclasses that support only exclusive or
  48:  * only shared modes need not define the methods supporting the unused mode.
  49:  *
  50:  * <p>This class defines a nested {@link ConditionObject} class that
  51:  * can be used as a {@link Condition} implementation by subclasses
  52:  * supporting exclusive mode for which method {@link
  53:  * #isHeldExclusively} reports whether synchronization is exclusively
  54:  * held with respect to the current thread, method {@link #release}
  55:  * invoked with the current {@link #getState} value fully releases
  56:  * this object, and {@link #acquire}, given this saved state value,
  57:  * eventually restores this object to its previous acquired state.  No
  58:  * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a
  59:  * condition, so if this constraint cannot be met, do not use it.  The
  60:  * behavior of {@link ConditionObject} depends of course on the
  61:  * semantics of its synchronizer implementation.
  62:  *
  63:  * <p>This class provides inspection, instrumentation, and monitoring
  64:  * methods for the internal queue, as well as similar methods for
  65:  * condition objects. These can be exported as desired into classes
  66:  * using an <tt>AbstractQueuedSynchronizer</tt> for their
  67:  * synchronization mechanics.
  68:  *
  69:  * <p>Serialization of this class stores only the underlying atomic
  70:  * integer maintaining state, so deserialized objects have empty
  71:  * thread queues. Typical subclasses requiring serializability will
  72:  * define a <tt>readObject</tt> method that restores this to a known
  73:  * initial state upon deserialization.
  74:  *
  75:  * <h3>Usage</h3>
  76:  *
  77:  * <p>To use this class as the basis of a synchronizer, redefine the
  78:  * following methods, as applicable, by inspecting and/or modifying
  79:  * the synchronization state using {@link #getState}, {@link
  80:  * #setState} and/or {@link #compareAndSetState}:
  81:  *
  82:  * <ul>
  83:  * <li> {@link #tryAcquire}
  84:  * <li> {@link #tryRelease}
  85:  * <li> {@link #tryAcquireShared}
  86:  * <li> {@link #tryReleaseShared}
  87:  * <li> {@link #isHeldExclusively}
  88:  *</ul>
  89:  *
  90:  * Each of these methods by default throws {@link
  91:  * UnsupportedOperationException}.  Implementations of these methods
  92:  * must be internally thread-safe, and should in general be short and
  93:  * not block. Defining these methods is the <em>only</em> supported
  94:  * means of using this class. All other methods are declared
  95:  * <tt>final</tt> because they cannot be independently varied.
  96:  *
  97:  * <p>You may also find the inherited methods from {@link
  98:  * AbstractOwnableSynchronizer} useful to keep track of the thread
  99:  * owning an exclusive synchronizer.  You are encouraged to use them
 100:  * -- this enables monitoring and diagnostic tools to assist users in
 101:  * determining which threads hold locks.
 102:  *
 103:  * <p>Even though this class is based on an internal FIFO queue, it
 104:  * does not automatically enforce FIFO acquisition policies.  The core
 105:  * of exclusive synchronization takes the form:
 106:  *
 107:  * <pre>
 108:  * Acquire:
 109:  *     while (!tryAcquire(arg)) {
 110:  *        <em>enqueue thread if it is not already queued</em>;
 111:  *        <em>possibly block current thread</em>;
 112:  *     }
 113:  *
 114:  * Release:
 115:  *     if (tryRelease(arg))
 116:  *        <em>unblock the first queued thread</em>;
 117:  * </pre>
 118:  *
 119:  * (Shared mode is similar but may involve cascading signals.)
 120:  *
 121:  * <p>Because checks in acquire are invoked before enqueuing, a newly
 122:  * acquiring thread may <em>barge</em> ahead of others that are
 123:  * blocked and queued. However, you can, if desired, define
 124:  * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable
 125:  * barging by internally invoking one or more of the inspection
 126:  * methods. In particular, a strict FIFO lock can define
 127:  * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link
 128:  * #getFirstQueuedThread} does not return the current thread.  A
 129:  * normally preferable non-strict fair version can immediately return
 130:  * <tt>false</tt> only if {@link #hasQueuedThreads} returns
 131:  * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current
 132:  * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both
 133:  * non-null and not the current thread.  Further variations are
 134:  * possible.
 135:  *
 136:  * <p>Throughput and scalability are generally highest for the
 137:  * default barging (also known as <em>greedy</em>,
 138:  * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
 139:  * While this is not guaranteed to be fair or starvation-free, earlier
 140:  * queued threads are allowed to recontend before later queued
 141:  * threads, and each recontention has an unbiased chance to succeed
 142:  * against incoming threads.  Also, while acquires do not
 143:  * &quot;spin&quot; in the usual sense, they may perform multiple
 144:  * invocations of <tt>tryAcquire</tt> interspersed with other
 145:  * computations before blocking.  This gives most of the benefits of
 146:  * spins when exclusive synchronization is only briefly held, without
 147:  * most of the liabilities when it isn't. If so desired, you can
 148:  * augment this by preceding calls to acquire methods with
 149:  * "fast-path" checks, possibly prechecking {@link #hasContended}
 150:  * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
 151:  * is likely not to be contended.
 152:  *
 153:  * <p>This class provides an efficient and scalable basis for
 154:  * synchronization in part by specializing its range of use to
 155:  * synchronizers that can rely on <tt>int</tt> state, acquire, and
 156:  * release parameters, and an internal FIFO wait queue. When this does
 157:  * not suffice, you can build synchronizers from a lower level using
 158:  * {@link java.util.concurrent.atomic atomic} classes, your own custom
 159:  * {@link java.util.Queue} classes, and {@link LockSupport} blocking
 160:  * support.
 161:  *
 162:  * <h3>Usage Examples</h3>
 163:  *
 164:  * <p>Here is a non-reentrant mutual exclusion lock class that uses
 165:  * the value zero to represent the unlocked state, and one to
 166:  * represent the locked state. While a non-reentrant lock
 167:  * does not strictly require recording of the current owner
 168:  * thread, this class does so anyway to make usage easier to monitor.
 169:  * It also supports conditions and exposes
 170:  * one of the instrumentation methods:
 171:  *
 172:  * <pre>
 173:  * class Mutex implements Lock, java.io.Serializable {
 174:  *
 175:  *   // Our internal helper class
 176:  *   private static class Sync extends AbstractQueuedSynchronizer {
 177:  *     // Report whether in locked state
 178:  *     protected boolean isHeldExclusively() {
 179:  *       return getState() == 1;
 180:  *     }
 181:  *
 182:  *     // Acquire the lock if state is zero
 183:  *     public boolean tryAcquire(int acquires) {
 184:  *       assert acquires == 1; // Otherwise unused
 185:  *       if (compareAndSetState(0, 1)) {
 186:  *         setExclusiveOwnerThread(Thread.currentThread());
 187:  *         return true;
 188:  *       }
 189:  *       return false;
 190:  *     }
 191:  *
 192:  *     // Release the lock by setting state to zero
 193:  *     protected boolean tryRelease(int releases) {
 194:  *       assert releases == 1; // Otherwise unused
 195:  *       if (getState() == 0) throw new IllegalMonitorStateException();
 196:  *       setExclusiveOwnerThread(null);
 197:  *       setState(0);
 198:  *       return true;
 199:  *     }
 200:  *
 201:  *     // Provide a Condition
 202:  *     Condition newCondition() { return new ConditionObject(); }
 203:  *
 204:  *     // Deserialize properly
 205:  *     private void readObject(ObjectInputStream s)
 206:  *         throws IOException, ClassNotFoundException {
 207:  *       s.defaultReadObject();
 208:  *       setState(0); // reset to unlocked state
 209:  *     }
 210:  *   }
 211:  *
 212:  *   // The sync object does all the hard work. We just forward to it.
 213:  *   private final Sync sync = new Sync();
 214:  *
 215:  *   public void lock()                { sync.acquire(1); }
 216:  *   public boolean tryLock()          { return sync.tryAcquire(1); }
 217:  *   public void unlock()              { sync.release(1); }
 218:  *   public Condition newCondition()   { return sync.newCondition(); }
 219:  *   public boolean isLocked()         { return sync.isHeldExclusively(); }
 220:  *   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
 221:  *   public void lockInterruptibly() throws InterruptedException {
 222:  *     sync.acquireInterruptibly(1);
 223:  *   }
 224:  *   public boolean tryLock(long timeout, TimeUnit unit)
 225:  *       throws InterruptedException {
 226:  *     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
 227:  *   }
 228:  * }
 229:  * </pre>
 230:  *
 231:  * <p>Here is a latch class that is like a {@link CountDownLatch}
 232:  * except that it only requires a single <tt>signal</tt> to
 233:  * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt>
 234:  * acquire and release methods.
 235:  *
 236:  * <pre>
 237:  * class BooleanLatch {
 238:  *
 239:  *   private static class Sync extends AbstractQueuedSynchronizer {
 240:  *     boolean isSignalled() { return getState() != 0; }
 241:  *
 242:  *     protected int tryAcquireShared(int ignore) {
 243:  *       return isSignalled()? 1 : -1;
 244:  *     }
 245:  *
 246:  *     protected boolean tryReleaseShared(int ignore) {
 247:  *       setState(1);
 248:  *       return true;
 249:  *     }
 250:  *   }
 251:  *
 252:  *   private final Sync sync = new Sync();
 253:  *   public boolean isSignalled() { return sync.isSignalled(); }
 254:  *   public void signal()         { sync.releaseShared(1); }
 255:  *   public void await() throws InterruptedException {
 256:  *     sync.acquireSharedInterruptibly(1);
 257:  *   }
 258:  * }
 259:  * </pre>
 260:  *
 261:  * @since 1.5
 262:  * @author Doug Lea
 263:  */
 264: public abstract class AbstractQueuedSynchronizer
 265:     extends AbstractOwnableSynchronizer
 266:     implements java.io.Serializable {
 267: 
 268:     private static final long serialVersionUID = 7373984972572414691L;
 269: 
 270:     /**
 271:      * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance
 272:      * with initial synchronization state of zero.
 273:      */
 274:     protected AbstractQueuedSynchronizer() { }
 275: 
 276:     /**
 277:      * Wait queue node class.
 278:      *
 279:      * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
 280:      * Hagersten) lock queue. CLH locks are normally used for
 281:      * spinlocks.  We instead use them for blocking synchronizers, but
 282:      * use the same basic tactic of holding some of the control
 283:      * information about a thread in the predecessor of its node.  A
 284:      * "status" field in each node keeps track of whether a thread
 285:      * should block.  A node is signalled when its predecessor
 286:      * releases.  Each node of the queue otherwise serves as a
 287:      * specific-notification-style monitor holding a single waiting
 288:      * thread. The status field does NOT control whether threads are
 289:      * granted locks etc though.  A thread may try to acquire if it is
 290:      * first in the queue. But being first does not guarantee success;
 291:      * it only gives the right to contend.  So the currently released
 292:      * contender thread may need to rewait.
 293:      *
 294:      * <p>To enqueue into a CLH lock, you atomically splice it in as new
 295:      * tail. To dequeue, you just set the head field.
 296:      * <pre>
 297:      *      +------+  prev +-----+       +-----+
 298:      * head |      | <---- |     | <---- |     |  tail
 299:      *      +------+       +-----+       +-----+
 300:      * </pre>
 301:      *
 302:      * <p>Insertion into a CLH queue requires only a single atomic
 303:      * operation on "tail", so there is a simple atomic point of
 304:      * demarcation from unqueued to queued. Similarly, dequeing
 305:      * involves only updating the "head". However, it takes a bit
 306:      * more work for nodes to determine who their successors are,
 307:      * in part to deal with possible cancellation due to timeouts
 308:      * and interrupts.
 309:      *
 310:      * <p>The "prev" links (not used in original CLH locks), are mainly
 311:      * needed to handle cancellation. If a node is cancelled, its
 312:      * successor is (normally) relinked to a non-cancelled
 313:      * predecessor. For explanation of similar mechanics in the case
 314:      * of spin locks, see the papers by Scott and Scherer at
 315:      * http://www.cs.rochester.edu/u/scott/synchronization/
 316:      *
 317:      * <p>We also use "next" links to implement blocking mechanics.
 318:      * The thread id for each node is kept in its own node, so a
 319:      * predecessor signals the next node to wake up by traversing
 320:      * next link to determine which thread it is.  Determination of
 321:      * successor must avoid races with newly queued nodes to set
 322:      * the "next" fields of their predecessors.  This is solved
 323:      * when necessary by checking backwards from the atomically
 324:      * updated "tail" when a node's successor appears to be null.
 325:      * (Or, said differently, the next-links are an optimization
 326:      * so that we don't usually need a backward scan.)
 327:      *
 328:      * <p>Cancellation introduces some conservatism to the basic
 329:      * algorithms.  Since we must poll for cancellation of other
 330:      * nodes, we can miss noticing whether a cancelled node is
 331:      * ahead or behind us. This is dealt with by always unparking
 332:      * successors upon cancellation, allowing them to stabilize on
 333:      * a new predecessor.
 334:      *
 335:      * <p>CLH queues need a dummy header node to get started. But
 336:      * we don't create them on construction, because it would be wasted
 337:      * effort if there is never contention. Instead, the node
 338:      * is constructed and head and tail pointers are set upon first
 339:      * contention.
 340:      *
 341:      * <p>Threads waiting on Conditions use the same nodes, but
 342:      * use an additional link. Conditions only need to link nodes
 343:      * in simple (non-concurrent) linked queues because they are
 344:      * only accessed when exclusively held.  Upon await, a node is
 345:      * inserted into a condition queue.  Upon signal, the node is
 346:      * transferred to the main queue.  A special value of status
 347:      * field is used to mark which queue a node is on.
 348:      *
 349:      * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
 350:      * Scherer and Michael Scott, along with members of JSR-166
 351:      * expert group, for helpful ideas, discussions, and critiques
 352:      * on the design of this class.
 353:      */
 354:     static final class Node {
 355:         /** waitStatus value to indicate thread has cancelled */
 356:         static final int CANCELLED =  1;
 357:         /** waitStatus value to indicate successor's thread needs unparking */
 358:         static final int SIGNAL    = -1;
 359:         /** waitStatus value to indicate thread is waiting on condition */
 360:         static final int CONDITION = -2;
 361:         /** Marker to indicate a node is waiting in shared mode */
 362:         static final Node SHARED = new Node();
 363:         /** Marker to indicate a node is waiting in exclusive mode */
 364:         static final Node EXCLUSIVE = null;
 365: 
 366:         /**
 367:          * Status field, taking on only the values:
 368:          *   SIGNAL:     The successor of this node is (or will soon be)
 369:          *               blocked (via park), so the current node must
 370:          *               unpark its successor when it releases or
 371:          *               cancels. To avoid races, acquire methods must
 372:          *               first indicate they need a signal,
 373:          *               then retry the atomic acquire, and then,
 374:          *               on failure, block.
 375:          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
 376:          *               Nodes never leave this state. In particular,
 377:          *               a thread with cancelled node never again blocks.
 378:          *   CONDITION:  This node is currently on a condition queue.
 379:          *               It will not be used as a sync queue node until
 380:          *               transferred. (Use of this value here
 381:          *               has nothing to do with the other uses
 382:          *               of the field, but simplifies mechanics.)
 383:          *   0:          None of the above
 384:          *
 385:          * The values are arranged numerically to simplify use.
 386:          * Non-negative values mean that a node doesn't need to
 387:          * signal. So, most code doesn't need to check for particular
 388:          * values, just for sign.
 389:          *
 390:          * The field is initialized to 0 for normal sync nodes, and
 391:          * CONDITION for condition nodes.  It is modified only using
 392:          * CAS.
 393:          */
 394:         volatile int waitStatus;
 395: 
 396:         /**
 397:          * Link to predecessor node that current node/thread relies on
 398:          * for checking waitStatus. Assigned during enqueing, and nulled
 399:          * out (for sake of GC) only upon dequeuing.  Also, upon
 400:          * cancellation of a predecessor, we short-circuit while
 401:          * finding a non-cancelled one, which will always exist
 402:          * because the head node is never cancelled: A node becomes
 403:          * head only as a result of successful acquire. A
 404:          * cancelled thread never succeeds in acquiring, and a thread only
 405:          * cancels itself, not any other node.
 406:          */
 407:         volatile Node prev;
 408: 
 409:         /**
 410:          * Link to the successor node that the current node/thread
 411:          * unparks upon release. Assigned once during enqueuing, and
 412:          * nulled out (for sake of GC) when no longer needed.  Upon
 413:          * cancellation, we cannot adjust this field, but can notice
 414:          * status and bypass the node if cancelled.  The enq operation
 415:          * does not assign next field of a predecessor until after
 416:          * attachment, so seeing a null next field does not
 417:          * necessarily mean that node is at end of queue. However, if
 418:          * a next field appears to be null, we can scan prev's from
 419:          * the tail to double-check.
 420:          */
 421:         volatile Node next;
 422: 
 423:         /**
 424:          * The thread that enqueued this node.  Initialized on
 425:          * construction and nulled out after use.
 426:          */
 427:         volatile Thread thread;
 428: 
 429:         /**
 430:          * Link to next node waiting on condition, or the special
 431:          * value SHARED.  Because condition queues are accessed only
 432:          * when holding in exclusive mode, we just need a simple
 433:          * linked queue to hold nodes while they are waiting on
 434:          * conditions. They are then transferred to the queue to
 435:          * re-acquire. And because conditions can only be exclusive,
 436:          * we save a field by using special value to indicate shared
 437:          * mode.
 438:          */
 439:         Node nextWaiter;
 440: 
 441:         /**
 442:          * Returns true if node is waiting in shared mode
 443:          */
 444:         final boolean isShared() {
 445:             return nextWaiter == SHARED;
 446:         }
 447: 
 448:         /**
 449:          * Returns previous node, or throws NullPointerException if
 450:          * null.  Use when predecessor cannot be null.
 451:          * @return the predecessor of this node
 452:          */
 453:         final Node predecessor() throws NullPointerException {
 454:             Node p = prev;
 455:             if (p == null)
 456:                 throw new NullPointerException();
 457:             else
 458:                 return p;
 459:         }
 460: 
 461:         Node() {    // Used to establish initial head or SHARED marker
 462:         }
 463: 
 464:         Node(Thread thread, Node mode) {     // Used by addWaiter
 465:             this.nextWaiter = mode;
 466:             this.thread = thread;
 467:         }
 468: 
 469:         Node(Thread thread, int waitStatus) { // Used by Condition
 470:             this.waitStatus = waitStatus;
 471:             this.thread = thread;
 472:         }
 473:     }
 474: 
 475:     /**
 476:      * Head of the wait queue, lazily initialized.  Except for
 477:      * initialization, it is modified only via method setHead.  Note:
 478:      * If head exists, its waitStatus is guaranteed not to be
 479:      * CANCELLED.
 480:      */
 481:     private transient volatile Node head;
 482: 
 483:     /**
 484:      * Tail of the wait queue, lazily initialized.  Modified only via
 485:      * method enq to add new wait node.
 486:      */
 487:     private transient volatile Node tail;
 488: 
 489:     /**
 490:      * The synchronization state.
 491:      */
 492:     private volatile int state;
 493: 
 494:     /**
 495:      * Returns the current value of synchronization state.
 496:      * This operation has memory semantics of a <tt>volatile</tt> read.
 497:      * @return current state value
 498:      */
 499:     protected final int getState() {
 500:         return state;
 501:     }
 502: 
 503:     /**
 504:      * Sets the value of synchronization state.
 505:      * This operation has memory semantics of a <tt>volatile</tt> write.
 506:      * @param newState the new state value
 507:      */
 508:     protected final void setState(int newState) {
 509:         state = newState;
 510:     }
 511: 
 512:     /**
 513:      * Atomically sets synchronization state to the given updated
 514:      * value if the current state value equals the expected value.
 515:      * This operation has memory semantics of a <tt>volatile</tt> read
 516:      * and write.
 517:      *
 518:      * @param expect the expected value
 519:      * @param update the new value
 520:      * @return true if successful. False return indicates that the actual
 521:      *         value was not equal to the expected value.
 522:      */
 523:     protected final boolean compareAndSetState(int expect, int update) {
 524:         // See below for intrinsics setup to support this
 525:         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 526:     }
 527: 
 528:     // Queuing utilities
 529: 
 530:     /**
 531:      * The number of nanoseconds for which it is faster to spin
 532:      * rather than to use timed park. A rough estimate suffices
 533:      * to improve responsiveness with very short timeouts.
 534:      */
 535:     static final long spinForTimeoutThreshold = 1000L;
 536: 
 537:     /**
 538:      * Inserts node into queue, initializing if necessary. See picture above.
 539:      * @param node the node to insert
 540:      * @return node's predecessor
 541:      */
 542:     private Node enq(final Node node) {
 543:         for (;;) {
 544:             Node t = tail;
 545:             if (t == null) { // Must initialize
 546:                 Node h = new Node(); // Dummy header
 547:                 h.next = node;
 548:                 node.prev = h;
 549:                 if (compareAndSetHead(h)) {
 550:                     tail = node;
 551:                     return h;
 552:                 }
 553:             }
 554:             else {
 555:                 node.prev = t;
 556:                 if (compareAndSetTail(t, node)) {
 557:                     t.next = node;
 558:                     return t;
 559:                 }
 560:             }
 561:         }
 562:     }
 563: 
 564:     /**
 565:      * Creates and enqueues node for given thread and mode.
 566:      *
 567:      * @param current the thread
 568:      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 569:      * @return the new node
 570:      */
 571:     private Node addWaiter(Node mode) {
 572:         Node node = new Node(Thread.currentThread(), mode);
 573:         // Try the fast path of enq; backup to full enq on failure
 574:         Node pred = tail;
 575:         if (pred != null) {
 576:             node.prev = pred;
 577:             if (compareAndSetTail(pred, node)) {
 578:                 pred.next = node;
 579:                 return node;
 580:             }
 581:         }
 582:         enq(node);
 583:         return node;
 584:     }
 585: 
 586:     /**
 587:      * Sets head of queue to be node, thus dequeuing. Called only by
 588:      * acquire methods.  Also nulls out unused fields for sake of GC
 589:      * and to suppress unnecessary signals and traversals.
 590:      *
 591:      * @param node the node
 592:      */
 593:     private void setHead(Node node) {
 594:         head = node;
 595:         node.thread = null;
 596:         node.prev = null;
 597:     }
 598: 
 599:     /**
 600:      * Wakes up node's successor, if one exists.
 601:      *
 602:      * @param node the node
 603:      */
 604:     private void unparkSuccessor(Node node) {
 605:         /*
 606:          * Try to clear status in anticipation of signalling.  It is
 607:          * OK if this fails or if status is changed by waiting thread.
 608:          */
 609:         compareAndSetWaitStatus(node, Node.SIGNAL, 0);
 610: 
 611:         /*
 612:          * Thread to unpark is held in successor, which is normally
 613:          * just the next node.  But if cancelled or apparently null,
 614:          * traverse backwards from tail to find the actual
 615:          * non-cancelled successor.
 616:          */
 617:         Node s = node.next;
 618:         if (s == null || s.waitStatus > 0) {
 619:             s = null;
 620:             for (Node t = tail; t != null && t != node; t = t.prev)
 621:                 if (t.waitStatus <= 0)
 622:                     s = t;
 623:         }
 624:         if (s != null)
 625:             LockSupport.unpark(s.thread);
 626:     }
 627: 
 628:     /**
 629:      * Sets head of queue, and checks if successor may be waiting
 630:      * in shared mode, if so propagating if propagate > 0.
 631:      *
 632:      * @param pred the node holding waitStatus for node
 633:      * @param node the node
 634:      * @param propagate the return value from a tryAcquireShared
 635:      */
 636:     private void setHeadAndPropagate(Node node, int propagate) {
 637:         setHead(node);
 638:         if (propagate > 0 && node.waitStatus != 0) {
 639:             /*
 640:              * Don't bother fully figuring out successor.  If it
 641:              * looks null, call unparkSuccessor anyway to be safe.
 642:              */
 643:             Node s = node.next;
 644:             if (s == null || s.isShared())
 645:                 unparkSuccessor(node);
 646:         }
 647:     }
 648: 
 649:     // Utilities for various versions of acquire
 650: 
 651:     /**
 652:      * Cancels an ongoing attempt to acquire.
 653:      *
 654:      * @param node the node
 655:      */
 656:     private void cancelAcquire(Node node) {
 657:         if (node != null) { // Ignore if node doesn't exist
 658:             node.thread = null;
 659:             // Can use unconditional write instead of CAS here
 660:             node.waitStatus = Node.CANCELLED;
 661:             unparkSuccessor(node);
 662:         }
 663:     }
 664: 
 665:     /**
 666:      * Checks and updates status for a node that failed to acquire.
 667:      * Returns true if thread should block. This is the main signal
 668:      * control in all acquire loops.  Requires that pred == node.prev
 669:      *
 670:      * @param pred node's predecessor holding status
 671:      * @param node the node
 672:      * @return {@code true} if thread should block
 673:      */
 674:     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 675:         int s = pred.waitStatus;
 676:         if (s < 0)
 677:             /*
 678:              * This node has already set status asking a release
 679:              * to signal it, so it can safely park
 680:              */
 681:             return true;
 682:         if (s > 0)
 683:             /*
 684:              * Predecessor was cancelled. Move up to its predecessor
 685:              * and indicate retry.
 686:              */
 687:             node.prev = pred.prev;
 688:         else
 689:             /*
 690:              * Indicate that we need a signal, but don't park yet. Caller
 691:              * will need to retry to make sure it cannot acquire before
 692:              * parking.
 693:              */
 694:             compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
 695:         return false;
 696:     }
 697: 
 698:     /**
 699:      * Convenience method to interrupt current thread.
 700:      */
 701:     private static void selfInterrupt() {
 702:         Thread.currentThread().interrupt();
 703:     }
 704: 
 705:     /**
 706:      * Convenience method to park and then check if interrupted
 707:      *
 708:      * @return {@code true} if interrupted
 709:      */
 710:     private final boolean parkAndCheckInterrupt() {
 711:         LockSupport.park(this);
 712:         return Thread.interrupted();
 713:     }
 714: 
 715:     /*
 716:      * Various flavors of acquire, varying in exclusive/shared and
 717:      * control modes.  Each is mostly the same, but annoyingly
 718:      * different.  Only a little bit of factoring is possible due to
 719:      * interactions of exception mechanics (including ensuring that we
 720:      * cancel if tryAcquire throws exception) and other control, at
 721:      * least not without hurting performance too much.
 722:      */
 723: 
 724:     /**
 725:      * Acquires in exclusive uninterruptible mode for thread already in
 726:      * queue. Used by condition wait methods as well as acquire.
 727:      *
 728:      * @param node the node
 729:      * @param arg the acquire argument
 730:      * @return {@code true} if interrupted while waiting
 731:      */
 732:     final boolean acquireQueued(final Node node, int arg) {
 733:         try {
 734:             boolean interrupted = false;
 735:             for (;;) {
 736:                 final Node p = node.predecessor();
 737:                 if (p == head && tryAcquire(arg)) {
 738:                     setHead(node);
 739:                     p.next = null; // help GC
 740:                     return interrupted;
 741:                 }
 742:                 if (shouldParkAfterFailedAcquire(p, node) &&
 743:                     parkAndCheckInterrupt())
 744:                     interrupted = true;
 745:             }
 746:         } catch (RuntimeException ex) {
 747:             cancelAcquire(node);
 748:             throw ex;
 749:         }
 750:     }
 751: 
 752:     /**
 753:      * Acquires in exclusive interruptible mode.
 754:      * @param arg the acquire argument
 755:      */
 756:     private void doAcquireInterruptibly(int arg)
 757:         throws InterruptedException {
 758:         final Node node = addWaiter(Node.EXCLUSIVE);
 759:         try {
 760:             for (;;) {
 761:                 final Node p = node.predecessor();
 762:                 if (p == head && tryAcquire(arg)) {
 763:                     setHead(node);
 764:                     p.next = null; // help GC
 765:                     return;
 766:                 }
 767:                 if (shouldParkAfterFailedAcquire(p, node) &&
 768:                     parkAndCheckInterrupt())
 769:                     break;
 770:             }
 771:         } catch (RuntimeException ex) {
 772:             cancelAcquire(node);
 773:             throw ex;
 774:         }
 775:         // Arrive here only if interrupted
 776:         cancelAcquire(node);
 777:         throw new InterruptedException();
 778:     }
 779: 
 780:     /**
 781:      * Acquires in exclusive timed mode.
 782:      *
 783:      * @param arg the acquire argument
 784:      * @param nanosTimeout max wait time
 785:      * @return {@code true} if acquired
 786:      */
 787:     private boolean doAcquireNanos(int arg, long nanosTimeout)
 788:         throws InterruptedException {
 789:         long lastTime = System.nanoTime();
 790:         final Node node = addWaiter(Node.EXCLUSIVE);
 791:         try {
 792:             for (;;) {
 793:                 final Node p = node.predecessor();
 794:                 if (p == head && tryAcquire(arg)) {
 795:                     setHead(node);
 796:                     p.next = null; // help GC
 797:                     return true;
 798:                 }
 799:                 if (nanosTimeout <= 0) {
 800:                     cancelAcquire(node);
 801:                     return false;
 802:                 }
 803:                 if (nanosTimeout > spinForTimeoutThreshold &&
 804:                     shouldParkAfterFailedAcquire(p, node))
 805:                     LockSupport.parkNanos(this, nanosTimeout);
 806:                 long now = System.nanoTime();
 807:                 nanosTimeout -= now - lastTime;
 808:                 lastTime = now;
 809:                 if (Thread.interrupted())
 810:                     break;
 811:             }
 812:         } catch (RuntimeException ex) {
 813:             cancelAcquire(node);
 814:             throw ex;
 815:         }
 816:         // Arrive here only if interrupted
 817:         cancelAcquire(node);
 818:         throw new InterruptedException();
 819:     }
 820: 
 821:     /**
 822:      * Acquires in shared uninterruptible mode.
 823:      * @param arg the acquire argument
 824:      */
 825:     private void doAcquireShared(int arg) {
 826:         final Node node = addWaiter(Node.SHARED);
 827:         try {
 828:             boolean interrupted = false;
 829:             for (;;) {
 830:                 final Node p = node.predecessor();
 831:                 if (p == head) {
 832:                     int r = tryAcquireShared(arg);
 833:                     if (r >= 0) {
 834:                         setHeadAndPropagate(node, r);
 835:                         p.next = null; // help GC
 836:                         if (interrupted)
 837:                             selfInterrupt();
 838:                         return;
 839:                     }
 840:                 }
 841:                 if (shouldParkAfterFailedAcquire(p, node) &&
 842:                     parkAndCheckInterrupt())
 843:                     interrupted = true;
 844:             }
 845:         } catch (RuntimeException ex) {
 846:             cancelAcquire(node);
 847:             throw ex;
 848:         }
 849:     }
 850: 
 851:     /**
 852:      * Acquires in shared interruptible mode.
 853:      * @param arg the acquire argument
 854:      */
 855:     private void doAcquireSharedInterruptibly(int arg)
 856:         throws InterruptedException {
 857:         final Node node = addWaiter(Node.SHARED);
 858:         try {
 859:             for (;;) {
 860:                 final Node p = node.predecessor();
 861:                 if (p == head) {
 862:                     int r = tryAcquireShared(arg);
 863:                     if (r >= 0) {
 864:                         setHeadAndPropagate(node, r);
 865:                         p.next = null; // help GC
 866:                         return;
 867:                     }
 868:                 }
 869:                 if (shouldParkAfterFailedAcquire(p, node) &&
 870:                     parkAndCheckInterrupt())
 871:                     break;
 872:             }
 873:         } catch (RuntimeException ex) {
 874:             cancelAcquire(node);
 875:             throw ex;
 876:         }
 877:         // Arrive here only if interrupted
 878:         cancelAcquire(node);
 879:         throw new InterruptedException();
 880:     }
 881: 
 882:     /**
 883:      * Acquires in shared timed mode.
 884:      *
 885:      * @param arg the acquire argument
 886:      * @param nanosTimeout max wait time
 887:      * @return {@code true} if acquired
 888:      */
 889:     private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
 890:         throws InterruptedException {
 891: 
 892:         long lastTime = System.nanoTime();
 893:         final Node node = addWaiter(Node.SHARED);
 894:         try {
 895:             for (;;) {
 896:                 final Node p = node.predecessor();
 897:                 if (p == head) {
 898:                     int r = tryAcquireShared(arg);
 899:                     if (r >= 0) {
 900:                         setHeadAndPropagate(node, r);
 901:                         p.next = null; // help GC
 902:                         return true;
 903:                     }
 904:                 }
 905:                 if (nanosTimeout <= 0) {
 906:                     cancelAcquire(node);
 907:                     return false;
 908:                 }
 909:                 if (nanosTimeout > spinForTimeoutThreshold &&
 910:                     shouldParkAfterFailedAcquire(p, node))
 911:                     LockSupport.parkNanos(this, nanosTimeout);
 912:                 long now = System.nanoTime();
 913:                 nanosTimeout -= now - lastTime;
 914:                 lastTime = now;
 915:                 if (Thread.interrupted())
 916:                     break;
 917:             }
 918:         } catch (RuntimeException ex) {
 919:             cancelAcquire(node);
 920:             throw ex;
 921:         }
 922: