| GNU Classpath (0.97.2) | |
| Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent.locks; 8: import java.util.*; 9: import java.util.concurrent.*; 10: import java.util.concurrent.atomic.*; 11: import sun.misc.Unsafe; 12: 13: /** 14: * Provides a framework for implementing blocking locks and related 15: * synchronizers (semaphores, events, etc) that rely on 16: * first-in-first-out (FIFO) wait queues. This class is designed to 17: * be a useful basis for most kinds of synchronizers that rely on a 18: * single atomic <tt>int</tt> value to represent state. Subclasses 19: * must define the protected methods that change this state, and which 20: * define what that state means in terms of this object being acquired 21: * or released. Given these, the other methods in this class carry 22: * out all queuing and blocking mechanics. Subclasses can maintain 23: * other state fields, but only the atomically updated <tt>int</tt> 24: * value manipulated using methods {@link #getState}, {@link 25: * #setState} and {@link #compareAndSetState} is tracked with respect 26: * to synchronization. 27: * 28: * <p>Subclasses should be defined as non-public internal helper 29: * classes that are used to implement the synchronization properties 30: * of their enclosing class. Class 31: * <tt>AbstractQueuedSynchronizer</tt> does not implement any 32: * synchronization interface. Instead it defines methods such as 33: * {@link #acquireInterruptibly} that can be invoked as 34: * appropriate by concrete locks and related synchronizers to 35: * implement their public methods. 36: * 37: * <p>This class supports either or both a default <em>exclusive</em> 38: * mode and a <em>shared</em> mode. When acquired in exclusive mode, 39: * attempted acquires by other threads cannot succeed. Shared mode 40: * acquires by multiple threads may (but need not) succeed. This class 41: * does not "understand" these differences except in the 42: * mechanical sense that when a shared mode acquire succeeds, the next 43: * waiting thread (if one exists) must also determine whether it can 44: * acquire as well. Threads waiting in the different modes share the 45: * same FIFO queue. Usually, implementation subclasses support only 46: * one of these modes, but both can come into play for example in a 47: * {@link ReadWriteLock}. Subclasses that support only exclusive or 48: * only shared modes need not define the methods supporting the unused mode. 49: * 50: * <p>This class defines a nested {@link ConditionObject} class that 51: * can be used as a {@link Condition} implementation by subclasses 52: * supporting exclusive mode for which method {@link 53: * #isHeldExclusively} reports whether synchronization is exclusively 54: * held with respect to the current thread, method {@link #release} 55: * invoked with the current {@link #getState} value fully releases 56: * this object, and {@link #acquire}, given this saved state value, 57: * eventually restores this object to its previous acquired state. No 58: * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a 59: * condition, so if this constraint cannot be met, do not use it. The 60: * behavior of {@link ConditionObject} depends of course on the 61: * semantics of its synchronizer implementation. 62: * 63: * <p>This class provides inspection, instrumentation, and monitoring 64: * methods for the internal queue, as well as similar methods for 65: * condition objects. These can be exported as desired into classes 66: * using an <tt>AbstractQueuedSynchronizer</tt> for their 67: * synchronization mechanics. 68: * 69: * <p>Serialization of this class stores only the underlying atomic 70: * integer maintaining state, so deserialized objects have empty 71: * thread queues. Typical subclasses requiring serializability will 72: * define a <tt>readObject</tt> method that restores this to a known 73: * initial state upon deserialization. 74: * 75: * <h3>Usage</h3> 76: * 77: * <p>To use this class as the basis of a synchronizer, redefine the 78: * following methods, as applicable, by inspecting and/or modifying 79: * the synchronization state using {@link #getState}, {@link 80: * #setState} and/or {@link #compareAndSetState}: 81: * 82: * <ul> 83: * <li> {@link #tryAcquire} 84: * <li> {@link #tryRelease} 85: * <li> {@link #tryAcquireShared} 86: * <li> {@link #tryReleaseShared} 87: * <li> {@link #isHeldExclusively} 88: *</ul> 89: * 90: * Each of these methods by default throws {@link 91: * UnsupportedOperationException}. Implementations of these methods 92: * must be internally thread-safe, and should in general be short and 93: * not block. Defining these methods is the <em>only</em> supported 94: * means of using this class. All other methods are declared 95: * <tt>final</tt> because they cannot be independently varied. 96: * 97: * <p>You may also find the inherited methods from {@link 98: * AbstractOwnableSynchronizer} useful to keep track of the thread 99: * owning an exclusive synchronizer. You are encouraged to use them 100: * -- this enables monitoring and diagnostic tools to assist users in 101: * determining which threads hold locks. 102: * 103: * <p>Even though this class is based on an internal FIFO queue, it 104: * does not automatically enforce FIFO acquisition policies. The core 105: * of exclusive synchronization takes the form: 106: * 107: * <pre> 108: * Acquire: 109: * while (!tryAcquire(arg)) { 110: * <em>enqueue thread if it is not already queued</em>; 111: * <em>possibly block current thread</em>; 112: * } 113: * 114: * Release: 115: * if (tryRelease(arg)) 116: * <em>unblock the first queued thread</em>; 117: * </pre> 118: * 119: * (Shared mode is similar but may involve cascading signals.) 120: * 121: * <p>Because checks in acquire are invoked before enqueuing, a newly 122: * acquiring thread may <em>barge</em> ahead of others that are 123: * blocked and queued. However, you can, if desired, define 124: * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable 125: * barging by internally invoking one or more of the inspection 126: * methods. In particular, a strict FIFO lock can define 127: * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link 128: * #getFirstQueuedThread} does not return the current thread. A 129: * normally preferable non-strict fair version can immediately return 130: * <tt>false</tt> only if {@link #hasQueuedThreads} returns 131: * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current 132: * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both 133: * non-null and not the current thread. Further variations are 134: * possible. 135: * 136: * <p>Throughput and scalability are generally highest for the 137: * default barging (also known as <em>greedy</em>, 138: * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 139: * While this is not guaranteed to be fair or starvation-free, earlier 140: * queued threads are allowed to recontend before later queued 141: * threads, and each recontention has an unbiased chance to succeed 142: * against incoming threads. Also, while acquires do not 143: * "spin" in the usual sense, they may perform multiple 144: * invocations of <tt>tryAcquire</tt> interspersed with other 145: * computations before blocking. This gives most of the benefits of 146: * spins when exclusive synchronization is only briefly held, without 147: * most of the liabilities when it isn't. If so desired, you can 148: * augment this by preceding calls to acquire methods with 149: * "fast-path" checks, possibly prechecking {@link #hasContended} 150: * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 151: * is likely not to be contended. 152: * 153: * <p>This class provides an efficient and scalable basis for 154: * synchronization in part by specializing its range of use to 155: * synchronizers that can rely on <tt>int</tt> state, acquire, and 156: * release parameters, and an internal FIFO wait queue. When this does 157: * not suffice, you can build synchronizers from a lower level using 158: * {@link java.util.concurrent.atomic atomic} classes, your own custom 159: * {@link java.util.Queue} classes, and {@link LockSupport} blocking 160: * support. 161: * 162: * <h3>Usage Examples</h3> 163: * 164: * <p>Here is a non-reentrant mutual exclusion lock class that uses 165: * the value zero to represent the unlocked state, and one to 166: * represent the locked state. While a non-reentrant lock 167: * does not strictly require recording of the current owner 168: * thread, this class does so anyway to make usage easier to monitor. 169: * It also supports conditions and exposes 170: * one of the instrumentation methods: 171: * 172: * <pre> 173: * class Mutex implements Lock, java.io.Serializable { 174: * 175: * // Our internal helper class 176: * private static class Sync extends AbstractQueuedSynchronizer { 177: * // Report whether in locked state 178: * protected boolean isHeldExclusively() { 179: * return getState() == 1; 180: * } 181: * 182: * // Acquire the lock if state is zero 183: * public boolean tryAcquire(int acquires) { 184: * assert acquires == 1; // Otherwise unused 185: * if (compareAndSetState(0, 1)) { 186: * setExclusiveOwnerThread(Thread.currentThread()); 187: * return true; 188: * } 189: * return false; 190: * } 191: * 192: * // Release the lock by setting state to zero 193: * protected boolean tryRelease(int releases) { 194: * assert releases == 1; // Otherwise unused 195: * if (getState() == 0) throw new IllegalMonitorStateException(); 196: * setExclusiveOwnerThread(null); 197: * setState(0); 198: * return true; 199: * } 200: * 201: * // Provide a Condition 202: * Condition newCondition() { return new ConditionObject(); } 203: * 204: * // Deserialize properly 205: * private void readObject(ObjectInputStream s) 206: * throws IOException, ClassNotFoundException { 207: * s.defaultReadObject(); 208: * setState(0); // reset to unlocked state 209: * } 210: * } 211: * 212: * // The sync object does all the hard work. We just forward to it. 213: * private final Sync sync = new Sync(); 214: * 215: * public void lock() { sync.acquire(1); } 216: * public boolean tryLock() { return sync.tryAcquire(1); } 217: * public void unlock() { sync.release(1); } 218: * public Condition newCondition() { return sync.newCondition(); } 219: * public boolean isLocked() { return sync.isHeldExclusively(); } 220: * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 221: * public void lockInterruptibly() throws InterruptedException { 222: * sync.acquireInterruptibly(1); 223: * } 224: * public boolean tryLock(long timeout, TimeUnit unit) 225: * throws InterruptedException { 226: * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 227: * } 228: * } 229: * </pre> 230: * 231: * <p>Here is a latch class that is like a {@link CountDownLatch} 232: * except that it only requires a single <tt>signal</tt> to 233: * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt> 234: * acquire and release methods. 235: * 236: * <pre> 237: * class BooleanLatch { 238: * 239: * private static class Sync extends AbstractQueuedSynchronizer { 240: * boolean isSignalled() { return getState() != 0; } 241: * 242: * protected int tryAcquireShared(int ignore) { 243: * return isSignalled()? 1 : -1; 244: * } 245: * 246: * protected boolean tryReleaseShared(int ignore) { 247: * setState(1); 248: * return true; 249: * } 250: * } 251: * 252: * private final Sync sync = new Sync(); 253: * public boolean isSignalled() { return sync.isSignalled(); } 254: * public void signal() { sync.releaseShared(1); } 255: * public void await() throws InterruptedException { 256: * sync.acquireSharedInterruptibly(1); 257: * } 258: * } 259: * </pre> 260: * 261: * @since 1.5 262: * @author Doug Lea 263: */ 264: public abstract class AbstractQueuedSynchronizer 265: extends AbstractOwnableSynchronizer 266: implements java.io.Serializable { 267: 268: private static final long serialVersionUID = 7373984972572414691L; 269: 270: /** 271: * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance 272: * with initial synchronization state of zero. 273: */ 274: protected AbstractQueuedSynchronizer() { } 275: 276: /** 277: * Wait queue node class. 278: * 279: * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 280: * Hagersten) lock queue. CLH locks are normally used for 281: * spinlocks. We instead use them for blocking synchronizers, but 282: * use the same basic tactic of holding some of the control 283: * information about a thread in the predecessor of its node. A 284: * "status" field in each node keeps track of whether a thread 285: * should block. A node is signalled when its predecessor 286: * releases. Each node of the queue otherwise serves as a 287: * specific-notification-style monitor holding a single waiting 288: * thread. The status field does NOT control whether threads are 289: * granted locks etc though. A thread may try to acquire if it is 290: * first in the queue. But being first does not guarantee success; 291: * it only gives the right to contend. So the currently released 292: * contender thread may need to rewait. 293: * 294: * <p>To enqueue into a CLH lock, you atomically splice it in as new 295: * tail. To dequeue, you just set the head field. 296: * <pre> 297: * +------+ prev +-----+ +-----+ 298: * head | | <---- | | <---- | | tail 299: * +------+ +-----+ +-----+ 300: * </pre> 301: * 302: * <p>Insertion into a CLH queue requires only a single atomic 303: * operation on "tail", so there is a simple atomic point of 304: * demarcation from unqueued to queued. Similarly, dequeing 305: * involves only updating the "head". However, it takes a bit 306: * more work for nodes to determine who their successors are, 307: * in part to deal with possible cancellation due to timeouts 308: * and interrupts. 309: * 310: * <p>The "prev" links (not used in original CLH locks), are mainly 311: * needed to handle cancellation. If a node is cancelled, its 312: * successor is (normally) relinked to a non-cancelled 313: * predecessor. For explanation of similar mechanics in the case 314: * of spin locks, see the papers by Scott and Scherer at 315: * http://www.cs.rochester.edu/u/scott/synchronization/ 316: * 317: * <p>We also use "next" links to implement blocking mechanics. 318: * The thread id for each node is kept in its own node, so a 319: * predecessor signals the next node to wake up by traversing 320: * next link to determine which thread it is. Determination of 321: * successor must avoid races with newly queued nodes to set 322: * the "next" fields of their predecessors. This is solved 323: * when necessary by checking backwards from the atomically 324: * updated "tail" when a node's successor appears to be null. 325: * (Or, said differently, the next-links are an optimization 326: * so that we don't usually need a backward scan.) 327: * 328: * <p>Cancellation introduces some conservatism to the basic 329: * algorithms. Since we must poll for cancellation of other 330: * nodes, we can miss noticing whether a cancelled node is 331: * ahead or behind us. This is dealt with by always unparking 332: * successors upon cancellation, allowing them to stabilize on 333: * a new predecessor. 334: * 335: * <p>CLH queues need a dummy header node to get started. But 336: * we don't create them on construction, because it would be wasted 337: * effort if there is never contention. Instead, the node 338: * is constructed and head and tail pointers are set upon first 339: * contention. 340: * 341: * <p>Threads waiting on Conditions use the same nodes, but 342: * use an additional link. Conditions only need to link nodes 343: * in simple (non-concurrent) linked queues because they are 344: * only accessed when exclusively held. Upon await, a node is 345: * inserted into a condition queue. Upon signal, the node is 346: * transferred to the main queue. A special value of status 347: * field is used to mark which queue a node is on. 348: * 349: * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 350: * Scherer and Michael Scott, along with members of JSR-166 351: * expert group, for helpful ideas, discussions, and critiques 352: * on the design of this class. 353: */ 354: static final class Node { 355: /** waitStatus value to indicate thread has cancelled */ 356: static final int CANCELLED = 1; 357: /** waitStatus value to indicate successor's thread needs unparking */ 358: static final int SIGNAL = -1; 359: /** waitStatus value to indicate thread is waiting on condition */ 360: static final int CONDITION = -2; 361: /** Marker to indicate a node is waiting in shared mode */ 362: static final Node SHARED = new Node(); 363: /** Marker to indicate a node is waiting in exclusive mode */ 364: static final Node EXCLUSIVE = null; 365: 366: /** 367: * Status field, taking on only the values: 368: * SIGNAL: The successor of this node is (or will soon be) 369: * blocked (via park), so the current node must 370: * unpark its successor when it releases or 371: * cancels. To avoid races, acquire methods must 372: * first indicate they need a signal, 373: * then retry the atomic acquire, and then, 374: * on failure, block. 375: * CANCELLED: This node is cancelled due to timeout or interrupt. 376: * Nodes never leave this state. In particular, 377: * a thread with cancelled node never again blocks. 378: * CONDITION: This node is currently on a condition queue. 379: * It will not be used as a sync queue node until 380: * transferred. (Use of this value here 381: * has nothing to do with the other uses 382: * of the field, but simplifies mechanics.) 383: * 0: None of the above 384: * 385: * The values are arranged numerically to simplify use. 386: * Non-negative values mean that a node doesn't need to 387: * signal. So, most code doesn't need to check for particular 388: * values, just for sign. 389: * 390: * The field is initialized to 0 for normal sync nodes, and 391: * CONDITION for condition nodes. It is modified only using 392: * CAS. 393: */ 394: volatile int waitStatus; 395: 396: /** 397: * Link to predecessor node that current node/thread relies on 398: * for checking waitStatus. Assigned during enqueing, and nulled 399: * out (for sake of GC) only upon dequeuing. Also, upon 400: * cancellation of a predecessor, we short-circuit while 401: * finding a non-cancelled one, which will always exist 402: * because the head node is never cancelled: A node becomes 403: * head only as a result of successful acquire. A 404: * cancelled thread never succeeds in acquiring, and a thread only 405: * cancels itself, not any other node. 406: */ 407: volatile Node prev; 408: 409: /** 410: * Link to the successor node that the current node/thread 411: * unparks upon release. Assigned once during enqueuing, and 412: * nulled out (for sake of GC) when no longer needed. Upon 413: * cancellation, we cannot adjust this field, but can notice 414: * status and bypass the node if cancelled. The enq operation 415: * does not assign next field of a predecessor until after 416: * attachment, so seeing a null next field does not 417: * necessarily mean that node is at end of queue. However, if 418: * a next field appears to be null, we can scan prev's from 419: * the tail to double-check. 420: */ 421: volatile Node next; 422: 423: /** 424: * The thread that enqueued this node. Initialized on 425: * construction and nulled out after use. 426: */ 427: volatile Thread thread; 428: 429: /** 430: * Link to next node waiting on condition, or the special 431: * value SHARED. Because condition queues are accessed only 432: * when holding in exclusive mode, we just need a simple 433: * linked queue to hold nodes while they are waiting on 434: * conditions. They are then transferred to the queue to 435: * re-acquire. And because conditions can only be exclusive, 436: * we save a field by using special value to indicate shared 437: * mode. 438: */ 439: Node nextWaiter; 440: 441: /** 442: * Returns true if node is waiting in shared mode 443: */ 444: final boolean isShared() { 445: return nextWaiter == SHARED; 446: } 447: 448: /** 449: * Returns previous node, or throws NullPointerException if 450: * null. Use when predecessor cannot be null. 451: * @return the predecessor of this node 452: */ 453: final Node predecessor() throws NullPointerException { 454: Node p = prev; 455: if (p == null) 456: throw new NullPointerException(); 457: else 458: return p; 459: } 460: 461: Node() { // Used to establish initial head or SHARED marker 462: } 463: 464: Node(Thread thread, Node mode) { // Used by addWaiter 465: this.nextWaiter = mode; 466: this.thread = thread; 467: } 468: 469: Node(Thread thread, int waitStatus) { // Used by Condition 470: this.waitStatus = waitStatus; 471: this.thread = thread; 472: } 473: } 474: 475: /** 476: * Head of the wait queue, lazily initialized. Except for 477: * initialization, it is modified only via method setHead. Note: 478: * If head exists, its waitStatus is guaranteed not to be 479: * CANCELLED. 480: */ 481: private transient volatile Node head; 482: 483: /** 484: * Tail of the wait queue, lazily initialized. Modified only via 485: * method enq to add new wait node. 486: */ 487: private transient volatile Node tail; 488: 489: /** 490: * The synchronization state. 491: */ 492: private volatile int state; 493: 494: /** 495: * Returns the current value of synchronization state. 496: * This operation has memory semantics of a <tt>volatile</tt> read. 497: * @return current state value 498: */ 499: protected final int getState() { 500: return state; 501: } 502: 503: /** 504: * Sets the value of synchronization state. 505: * This operation has memory semantics of a <tt>volatile</tt> write. 506: * @param newState the new state value 507: */ 508: protected final void setState(int newState) { 509: state = newState; 510: } 511: 512: /** 513: * Atomically sets synchronization state to the given updated 514: * value if the current state value equals the expected value. 515: * This operation has memory semantics of a <tt>volatile</tt> read 516: * and write. 517: * 518: * @param expect the expected value 519: * @param update the new value 520: * @return true if successful. False return indicates that the actual 521: * value was not equal to the expected value. 522: */ 523: protected final boolean compareAndSetState(int expect, int update) { 524: // See below for intrinsics setup to support this 525: return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 526: } 527: 528: // Queuing utilities 529: 530: /** 531: * The number of nanoseconds for which it is faster to spin 532: * rather than to use timed park. A rough estimate suffices 533: * to improve responsiveness with very short timeouts. 534: */ 535: static final long spinForTimeoutThreshold = 1000L; 536: 537: /** 538: * Inserts node into queue, initializing if necessary. See picture above. 539: * @param node the node to insert 540: * @return node's predecessor 541: */ 542: private Node enq(final Node node) { 543: for (;;) { 544: Node t = tail; 545: if (t == null) { // Must initialize 546: Node h = new Node(); // Dummy header 547: h.next = node; 548: node.prev = h; 549: if (compareAndSetHead(h)) { 550: tail = node; 551: return h; 552: } 553: } 554: else { 555: node.prev = t; 556: if (compareAndSetTail(t, node)) { 557: t.next = node; 558: return t; 559: } 560: } 561: } 562: } 563: 564: /** 565: * Creates and enqueues node for given thread and mode. 566: * 567: * @param current the thread 568: * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 569: * @return the new node 570: */ 571: private Node addWaiter(Node mode) { 572: Node node = new Node(Thread.currentThread(), mode); 573: // Try the fast path of enq; backup to full enq on failure 574: Node pred = tail; 575: if (pred != null) { 576: node.prev = pred; 577: if (compareAndSetTail(pred, node)) { 578: pred.next = node; 579: return node; 580: } 581: } 582: enq(node); 583: return node; 584: } 585: 586: /** 587: * Sets head of queue to be node, thus dequeuing. Called only by 588: * acquire methods. Also nulls out unused fields for sake of GC 589: * and to suppress unnecessary signals and traversals. 590: * 591: * @param node the node 592: */ 593: private void setHead(Node node) { 594: head = node; 595: node.thread = null; 596: node.prev = null; 597: } 598: 599: /** 600: * Wakes up node's successor, if one exists. 601: * 602: * @param node the node 603: */ 604: private void unparkSuccessor(Node node) { 605: /* 606: * Try to clear status in anticipation of signalling. It is 607: * OK if this fails or if status is changed by waiting thread. 608: */ 609: compareAndSetWaitStatus(node, Node.SIGNAL, 0); 610: 611: /* 612: * Thread to unpark is held in successor, which is normally 613: * just the next node. But if cancelled or apparently null, 614: * traverse backwards from tail to find the actual 615: * non-cancelled successor. 616: */ 617: Node s = node.next; 618: if (s == null || s.waitStatus > 0) { 619: s = null; 620: for (Node t = tail; t != null && t != node; t = t.prev) 621: if (t.waitStatus <= 0) 622: s = t; 623: } 624: if (s != null) 625: LockSupport.unpark(s.thread); 626: } 627: 628: /** 629: * Sets head of queue, and checks if successor may be waiting 630: * in shared mode, if so propagating if propagate > 0. 631: * 632: * @param pred the node holding waitStatus for node 633: * @param node the node 634: * @param propagate the return value from a tryAcquireShared 635: */ 636: private void setHeadAndPropagate(Node node, int propagate) { 637: setHead(node); 638: if (propagate > 0 && node.waitStatus != 0) { 639: /* 640: * Don't bother fully figuring out successor. If it 641: * looks null, call unparkSuccessor anyway to be safe. 642: */ 643: Node s = node.next; 644: if (s == null || s.isShared()) 645: unparkSuccessor(node); 646: } 647: } 648: 649: // Utilities for various versions of acquire 650: 651: /** 652: * Cancels an ongoing attempt to acquire. 653: * 654: * @param node the node 655: */ 656: private void cancelAcquire(Node node) { 657: if (node != null) { // Ignore if node doesn't exist 658: node.thread = null; 659: // Can use unconditional write instead of CAS here 660: node.waitStatus = Node.CANCELLED; 661: unparkSuccessor(node); 662: } 663: } 664: 665: /** 666: * Checks and updates status for a node that failed to acquire. 667: * Returns true if thread should block. This is the main signal 668: * control in all acquire loops. Requires that pred == node.prev 669: * 670: * @param pred node's predecessor holding status 671: * @param node the node 672: * @return {@code true} if thread should block 673: */ 674: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 675: int s = pred.waitStatus; 676: if (s < 0) 677: /* 678: * This node has already set status asking a release 679: * to signal it, so it can safely park 680: */ 681: return true; 682: if (s > 0) 683: /* 684: * Predecessor was cancelled. Move up to its predecessor 685: * and indicate retry. 686: */ 687: node.prev = pred.prev; 688: else 689: /* 690: * Indicate that we need a signal, but don't park yet. Caller 691: * will need to retry to make sure it cannot acquire before 692: * parking. 693: */ 694: compareAndSetWaitStatus(pred, 0, Node.SIGNAL); 695: return false; 696: } 697: 698: /** 699: * Convenience method to interrupt current thread. 700: */ 701: private static void selfInterrupt() { 702: Thread.currentThread().interrupt(); 703: } 704: 705: /** 706: * Convenience method to park and then check if interrupted 707: * 708: * @return {@code true} if interrupted 709: */ 710: private final boolean parkAndCheckInterrupt() { 711: LockSupport.park(this); 712: return Thread.interrupted(); 713: } 714: 715: /* 716: * Various flavors of acquire, varying in exclusive/shared and 717: * control modes. Each is mostly the same, but annoyingly 718: * different. Only a little bit of factoring is possible due to 719: * interactions of exception mechanics (including ensuring that we 720: * cancel if tryAcquire throws exception) and other control, at 721: * least not without hurting performance too much. 722: */ 723: 724: /** 725: * Acquires in exclusive uninterruptible mode for thread already in 726: * queue. Used by condition wait methods as well as acquire. 727: * 728: * @param node the node 729: * @param arg the acquire argument 730: * @return {@code true} if interrupted while waiting 731: */ 732: final boolean acquireQueued(final Node node, int arg) { 733: try { 734: boolean interrupted = false; 735: for (;;) { 736: final Node p = node.predecessor(); 737: if (p == head && tryAcquire(arg)) { 738: setHead(node); 739: p.next = null; // help GC 740: return interrupted; 741: } 742: if (shouldParkAfterFailedAcquire(p, node) && 743: parkAndCheckInterrupt()) 744: interrupted = true; 745: } 746: } catch (RuntimeException ex) { 747: cancelAcquire(node); 748: throw ex; 749: } 750: } 751: 752: /** 753: * Acquires in exclusive interruptible mode. 754: * @param arg the acquire argument 755: */ 756: private void doAcquireInterruptibly(int arg) 757: throws InterruptedException { 758: final Node node = addWaiter(Node.EXCLUSIVE); 759: try { 760: for (;;) { 761: final Node p = node.predecessor(); 762: if (p == head && tryAcquire(arg)) { 763: setHead(node); 764: p.next = null; // help GC 765: return; 766: } 767: if (shouldParkAfterFailedAcquire(p, node) && 768: parkAndCheckInterrupt()) 769: break; 770: } 771: } catch (RuntimeException ex) { 772: cancelAcquire(node); 773: throw ex; 774: } 775: // Arrive here only if interrupted 776: cancelAcquire(node); 777: throw new InterruptedException(); 778: } 779: 780: /** 781: * Acquires in exclusive timed mode. 782: * 783: * @param arg the acquire argument 784: * @param nanosTimeout max wait time 785: * @return {@code true} if acquired 786: */ 787: private boolean doAcquireNanos(int arg, long nanosTimeout) 788: throws InterruptedException { 789: long lastTime = System.nanoTime(); 790: final Node node = addWaiter(Node.EXCLUSIVE); 791: try { 792: for (;;) { 793: final Node p = node.predecessor(); 794: if (p == head && tryAcquire(arg)) { 795: setHead(node); 796: p.next = null; // help GC 797: return true; 798: } 799: if (nanosTimeout <= 0) { 800: cancelAcquire(node); 801: return false; 802: } 803: if (nanosTimeout > spinForTimeoutThreshold && 804: shouldParkAfterFailedAcquire(p, node)) 805: LockSupport.parkNanos(this, nanosTimeout); 806: long now = System.nanoTime(); 807: nanosTimeout -= now - lastTime; 808: lastTime = now; 809: if (Thread.interrupted()) 810: break; 811: } 812: } catch (RuntimeException ex) { 813: cancelAcquire(node); 814: throw ex; 815: } 816: // Arrive here only if interrupted 817: cancelAcquire(node); 818: throw new InterruptedException(); 819: } 820: 821: /** 822: * Acquires in shared uninterruptible mode. 823: * @param arg the acquire argument 824: */ 825: private void doAcquireShared(int arg) { 826: final Node node = addWaiter(Node.SHARED); 827: try { 828: boolean interrupted = false; 829: for (;;) { 830: final Node p = node.predecessor(); 831: if (p == head) { 832: int r = tryAcquireShared(arg); 833: if (r >= 0) { 834: setHeadAndPropagate(node, r); 835: p.next = null; // help GC 836: if (interrupted) 837: selfInterrupt(); 838: return; 839: } 840: } 841: if (shouldParkAfterFailedAcquire(p, node) && 842: parkAndCheckInterrupt()) 843: interrupted = true; 844: } 845: } catch (RuntimeException ex) { 846: cancelAcquire(node); 847: throw ex; 848: } 849: } 850: 851: /** 852: * Acquires in shared interruptible mode. 853: * @param arg the acquire argument 854: */ 855: private void doAcquireSharedInterruptibly(int arg) 856: throws InterruptedException { 857: final Node node = addWaiter(Node.SHARED); 858: try { 859: for (;;) { 860: final Node p = node.predecessor(); 861: if (p == head) { 862: int r = tryAcquireShared(arg); 863: if (r >= 0) { 864: setHeadAndPropagate(node, r); 865: p.next = null; // help GC 866: return; 867: } 868: } 869: if (shouldParkAfterFailedAcquire(p, node) && 870: parkAndCheckInterrupt()) 871: break; 872: } 873: } catch (RuntimeException ex) { 874: cancelAcquire(node); 875: throw ex; 876: } 877: // Arrive here only if interrupted 878: cancelAcquire(node); 879: throw new InterruptedException(); 880: } 881: 882: /** 883: * Acquires in shared timed mode. 884: * 885: * @param arg the acquire argument 886: * @param nanosTimeout max wait time 887: * @return {@code true} if acquired 888: */ 889: private boolean doAcquireSharedNanos(int arg, long nanosTimeout) 890: throws InterruptedException { 891: 892: long lastTime = System.nanoTime(); 893: final Node node = addWaiter(Node.SHARED); 894: try { 895: for (;;) { 896: final Node p = node.predecessor(); 897: if (p == head) { 898: int r = tryAcquireShared(arg); 899: if (r >= 0) { 900: setHeadAndPropagate(node, r); 901: p.next = null; // help GC 902: return true; 903: } 904: } 905: if (nanosTimeout <= 0) { 906: cancelAcquire(node); 907: return false; 908: } 909: if (nanosTimeout > spinForTimeoutThreshold && 910: shouldParkAfterFailedAcquire(p, node)) 911: LockSupport.parkNanos(this, nanosTimeout); 912: long now = System.nanoTime(); 913: nanosTimeout -= now - lastTime; 914: lastTime = now; 915: if (Thread.interrupted()) 916: break; 917: } 918: } catch (RuntimeException ex) { 919: cancelAcquire(node); 920: throw ex; 921: } 922: