GNU Classpath (0.99.1-pre) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea, Bill Scherer, and Michael Scott with 3: * assistance from members of JCP JSR-166 Expert Group and released to 4: * the public domain, as explained at 5: * http://creativecommons.org/licenses/publicdomain 6: */ 7: 8: package java.util.concurrent; 9: import java.util.concurrent.locks.*; 10: import java.util.concurrent.atomic.*; 11: import java.util.*; 12: 13: /** 14: * A {@linkplain BlockingQueue blocking queue} in which each insert 15: * operation must wait for a corresponding remove operation by another 16: * thread, and vice versa. A synchronous queue does not have any 17: * internal capacity, not even a capacity of one. You cannot 18: * <tt>peek</tt> at a synchronous queue because an element is only 19: * present when you try to remove it; you cannot insert an element 20: * (using any method) unless another thread is trying to remove it; 21: * you cannot iterate as there is nothing to iterate. The 22: * <em>head</em> of the queue is the element that the first queued 23: * inserting thread is trying to add to the queue; if there is no such 24: * queued thread then no element is available for removal and 25: * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other 26: * <tt>Collection</tt> methods (for example <tt>contains</tt>), a 27: * <tt>SynchronousQueue</tt> acts as an empty collection. This queue 28: * does not permit <tt>null</tt> elements. 29: * 30: * <p>Synchronous queues are similar to rendezvous channels used in 31: * CSP and Ada. They are well suited for handoff designs, in which an 32: * object running in one thread must sync up with an object running 33: * in another thread in order to hand it some information, event, or 34: * task. 35: * 36: * <p> This class supports an optional fairness policy for ordering 37: * waiting producer and consumer threads. By default, this ordering 38: * is not guaranteed. However, a queue constructed with fairness set 39: * to <tt>true</tt> grants threads access in FIFO order. 40: * 41: * <p>This class and its iterator implement all of the 42: * <em>optional</em> methods of the {@link Collection} and {@link 43: * Iterator} interfaces. 44: * 45: * <p>This class is a member of the 46: * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 47: * Java Collections Framework</a>. 48: * 49: * @since 1.5 50: * @author Doug Lea and Bill Scherer and Michael Scott 51: * @param <E> the type of elements held in this collection 52: */ 53: public class SynchronousQueue<E> extends AbstractQueue<E> 54: implements BlockingQueue<E>, java.io.Serializable { 55: private static final long serialVersionUID = -3223113410248163686L; 56: 57: /* 58: * This class implements extensions of the dual stack and dual 59: * queue algorithms described in "Nonblocking Concurrent Objects 60: * with Condition Synchronization", by W. N. Scherer III and 61: * M. L. Scott. 18th Annual Conf. on Distributed Computing, 62: * Oct. 2004 (see also 63: * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 64: * The (Lifo) stack is used for non-fair mode, and the (Fifo) 65: * queue for fair mode. The performance of the two is generally 66: * similar. Fifo usually supports higher throughput under 67: * contention but Lifo maintains higher thread locality in common 68: * applications. 69: * 70: * A dual queue (and similarly stack) is one that at any given 71: * time either holds "data" -- items provided by put operations, 72: * or "requests" -- slots representing take operations, or is 73: * empty. A call to "fulfill" (i.e., a call requesting an item 74: * from a queue holding data or vice versa) dequeues a 75: * complementary node. The most interesting feature of these 76: * queues is that any operation can figure out which mode the 77: * queue is in, and act accordingly without needing locks. 78: * 79: * Both the queue and stack extend abstract class Transferer 80: * defining the single method transfer that does a put or a 81: * take. These are unified into a single method because in dual 82: * data structures, the put and take operations are symmetrical, 83: * so nearly all code can be combined. The resulting transfer 84: * methods are on the long side, but are easier to follow than 85: * they would be if broken up into nearly-duplicated parts. 86: * 87: * The queue and stack data structures share many conceptual 88: * similarities but very few concrete details. For simplicity, 89: * they are kept distinct so that they can later evolve 90: * separately. 91: * 92: * The algorithms here differ from the versions in the above paper 93: * in extending them for use in synchronous queues, as well as 94: * dealing with cancellation. The main differences include: 95: * 96: * 1. The original algorithms used bit-marked pointers, but 97: * the ones here use mode bits in nodes, leading to a number 98: * of further adaptations. 99: * 2. SynchronousQueues must block threads waiting to become 100: * fulfilled. 101: * 3. Support for cancellation via timeout and interrupts, 102: * including cleaning out cancelled nodes/threads 103: * from lists to avoid garbage retention and memory depletion. 104: * 105: * Blocking is mainly accomplished using LockSupport park/unpark, 106: * except that nodes that appear to be the next ones to become 107: * fulfilled first spin a bit (on multiprocessors only). On very 108: * busy synchronous queues, spinning can dramatically improve 109: * throughput. And on less busy ones, the amount of spinning is 110: * small enough not to be noticeable. 111: * 112: * Cleaning is done in different ways in queues vs stacks. For 113: * queues, we can almost always remove a node immediately in O(1) 114: * time (modulo retries for consistency checks) when it is 115: * cancelled. But if it may be pinned as the current tail, it must 116: * wait until some subsequent cancellation. For stacks, we need a 117: * potentially O(n) traversal to be sure that we can remove the 118: * node, but this can run concurrently with other threads 119: * accessing the stack. 120: * 121: * While garbage collection takes care of most node reclamation 122: * issues that otherwise complicate nonblocking algorithms, care 123: * is taken to "forget" references to data, other nodes, and 124: * threads that might be held on to long-term by blocked 125: * threads. In cases where setting to null would otherwise 126: * conflict with main algorithms, this is done by changing a 127: * node's link to now point to the node itself. This doesn't arise 128: * much for Stack nodes (because blocked threads do not hang on to 129: * old head pointers), but references in Queue nodes must be 130: * aggressively forgotten to avoid reachability of everything any 131: * node has ever referred to since arrival. 132: */ 133: 134: /** 135: * Shared internal API for dual stacks and queues. 136: */ 137: static abstract class Transferer { 138: /** 139: * Performs a put or take. 140: * 141: * @param e if non-null, the item to be handed to a consumer; 142: * if null, requests that transfer return an item 143: * offered by producer. 144: * @param timed if this operation should timeout 145: * @param nanos the timeout, in nanoseconds 146: * @return if non-null, the item provided or received; if null, 147: * the operation failed due to timeout or interrupt -- 148: * the caller can distinguish which of these occurred 149: * by checking Thread.interrupted. 150: */ 151: abstract Object transfer(Object e, boolean timed, long nanos); 152: } 153: 154: /** The number of CPUs, for spin control */ 155: static final int NCPUS = Runtime.getRuntime().availableProcessors(); 156: 157: /** 158: * The number of times to spin before blocking in timed waits. 159: * The value is empirically derived -- it works well across a 160: * variety of processors and OSes. Empirically, the best value 161: * seems not to vary with number of CPUs (beyond 2) so is just 162: * a constant. 163: */ 164: static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; 165: 166: /** 167: * The number of times to spin before blocking in untimed waits. 168: * This is greater than timed value because untimed waits spin 169: * faster since they don't need to check times on each spin. 170: */ 171: static final int maxUntimedSpins = maxTimedSpins * 16; 172: 173: /** 174: * The number of nanoseconds for which it is faster to spin 175: * rather than to use timed park. A rough estimate suffices. 176: */ 177: static final long spinForTimeoutThreshold = 1000L; 178: 179: /** Dual stack */ 180: static final class TransferStack extends Transferer { 181: /* 182: * This extends Scherer-Scott dual stack algorithm, differing, 183: * among other ways, by using "covering" nodes rather than 184: * bit-marked pointers: Fulfilling operations push on marker 185: * nodes (with FULFILLING bit set in mode) to reserve a spot 186: * to match a waiting node. 187: */ 188: 189: /* Modes for SNodes, ORed together in node fields */ 190: /** Node represents an unfulfilled consumer */ 191: static final int REQUEST = 0; 192: /** Node represents an unfulfilled producer */ 193: static final int DATA = 1; 194: /** Node is fulfilling another unfulfilled DATA or REQUEST */ 195: static final int FULFILLING = 2; 196: 197: /** Return true if m has fulfilling bit set */ 198: static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } 199: 200: /** Node class for TransferStacks. */ 201: static final class SNode { 202: volatile SNode next; // next node in stack 203: volatile SNode match; // the node matched to this 204: volatile Thread waiter; // to control park/unpark 205: Object item; // data; or null for REQUESTs 206: int mode; 207: // Note: item and mode fields don't need to be volatile 208: // since they are always written before, and read after, 209: // other volatile/atomic operations. 210: 211: SNode(Object item) { 212: this.item = item; 213: } 214: 215: static final AtomicReferenceFieldUpdater<SNode, SNode> 216: nextUpdater = AtomicReferenceFieldUpdater.newUpdater 217: (SNode.class, SNode.class, "next"); 218: 219: boolean casNext(SNode cmp, SNode val) { 220: return (cmp == next && 221: nextUpdater.compareAndSet(this, cmp, val)); 222: } 223: 224: static final AtomicReferenceFieldUpdater<SNode, SNode> 225: matchUpdater = AtomicReferenceFieldUpdater.newUpdater 226: (SNode.class, SNode.class, "match"); 227: 228: /** 229: * Tries to match node s to this node, if so, waking up thread. 230: * Fulfillers call tryMatch to identify their waiters. 231: * Waiters block until they have been matched. 232: * 233: * @param s the node to match 234: * @return true if successfully matched to s 235: */ 236: boolean tryMatch(SNode s) { 237: if (match == null && 238: matchUpdater.compareAndSet(this, null, s)) { 239: Thread w = waiter; 240: if (w != null) { // waiters need at most one unpark 241: waiter = null; 242: LockSupport.unpark(w); 243: } 244: return true; 245: } 246: return match == s; 247: } 248: 249: /** 250: * Tries to cancel a wait by matching node to itself. 251: */ 252: void tryCancel() { 253: matchUpdater.compareAndSet(this, null, this); 254: } 255: 256: boolean isCancelled() { 257: return match == this; 258: } 259: } 260: 261: /** The head (top) of the stack */ 262: volatile SNode head; 263: 264: static final AtomicReferenceFieldUpdater<TransferStack, SNode> 265: headUpdater = AtomicReferenceFieldUpdater.newUpdater 266: (TransferStack.class, SNode.class, "head"); 267: 268: boolean casHead(SNode h, SNode nh) { 269: return h == head && headUpdater.compareAndSet(this, h, nh); 270: } 271: 272: /** 273: * Creates or resets fields of a node. Called only from transfer 274: * where the node to push on stack is lazily created and 275: * reused when possible to help reduce intervals between reads 276: * and CASes of head and to avoid surges of garbage when CASes 277: * to push nodes fail due to contention. 278: */ 279: static SNode snode(SNode s, Object e, SNode next, int mode) { 280: if (s == null) s = new SNode(e); 281: s.mode = mode; 282: s.next = next; 283: return s; 284: } 285: 286: /** 287: * Puts or takes an item. 288: */ 289: Object transfer(Object e, boolean timed, long nanos) { 290: /* 291: * Basic algorithm is to loop trying one of three actions: 292: * 293: * 1. If apparently empty or already containing nodes of same 294: * mode, try to push node on stack and wait for a match, 295: * returning it, or null if cancelled. 296: * 297: * 2. If apparently containing node of complementary mode, 298: * try to push a fulfilling node on to stack, match 299: * with corresponding waiting node, pop both from 300: * stack, and return matched item. The matching or 301: * unlinking might not actually be necessary because of 302: * other threads performing action 3: 303: * 304: * 3. If top of stack already holds another fulfilling node, 305: * help it out by doing its match and/or pop 306: * operations, and then continue. The code for helping 307: * is essentially the same as for fulfilling, except 308: * that it doesn't return the item. 309: */ 310: 311: SNode s = null; // constructed/reused as needed 312: int mode = (e == null)? REQUEST : DATA; 313: 314: for (;;) { 315: SNode h = head; 316: if (h == null || h.mode == mode) { // empty or same-mode 317: if (timed && nanos <= 0) { // can't wait 318: if (h != null && h.isCancelled()) 319: casHead(h, h.next); // pop cancelled node 320: else 321: return null; 322: } else if (casHead(h, s = snode(s, e, h, mode))) { 323: SNode m = awaitFulfill(s, timed, nanos); 324: if (m == s) { // wait was cancelled 325: clean(s); 326: return null; 327: } 328: if ((h = head) != null && h.next == s) 329: casHead(h, s.next); // help s's fulfiller 330: return mode == REQUEST? m.item : s.item; 331: } 332: } else if (!isFulfilling(h.mode)) { // try to fulfill 333: if (h.isCancelled()) // already cancelled 334: casHead(h, h.next); // pop and retry 335: else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 336: for (;;) { // loop until matched or waiters disappear 337: SNode m = s.next; // m is s's match 338: if (m == null) { // all waiters are gone 339: casHead(s, null); // pop fulfill node 340: s = null; // use new node next time 341: break; // restart main loop 342: } 343: SNode mn = m.next; 344: if (m.tryMatch(s)) { 345: casHead(s, mn); // pop both s and m 346: return (mode == REQUEST)? m.item : s.item; 347: } else // lost match 348: s.casNext(m, mn); // help unlink 349: } 350: } 351: } else { // help a fulfiller 352: SNode m = h.next; // m is h's match 353: if (m == null) // waiter is gone 354: casHead(h, null); // pop fulfilling node 355: else { 356: SNode mn = m.next; 357: if (m.tryMatch(h)) // help match 358: casHead(h, mn); // pop both h and m 359: else // lost match 360: h.casNext(m, mn); // help unlink 361: } 362: } 363: } 364: } 365: 366: /** 367: * Spins/blocks until node s is matched by a fulfill operation. 368: * 369: * @param s the waiting node 370: * @param timed true if timed wait 371: * @param nanos timeout value 372: * @return matched node, or s if cancelled 373: */ 374: SNode awaitFulfill(SNode s, boolean timed, long nanos) { 375: /* 376: * When a node/thread is about to block, it sets its waiter 377: * field and then rechecks state at least one more time 378: * before actually parking, thus covering race vs 379: * fulfiller noticing that waiter is non-null so should be 380: * woken. 381: * 382: * When invoked by nodes that appear at the point of call 383: * to be at the head of the stack, calls to park are 384: * preceded by spins to avoid blocking when producers and 385: * consumers are arriving very close in time. This can 386: * happen enough to bother only on multiprocessors. 387: * 388: * The order of checks for returning out of main loop 389: * reflects fact that interrupts have precedence over 390: * normal returns, which have precedence over 391: * timeouts. (So, on timeout, one last check for match is 392: * done before giving up.) Except that calls from untimed 393: * SynchronousQueue.{poll/offer} don't check interrupts 394: * and don't wait at all, so are trapped in transfer 395: * method rather than calling awaitFulfill. 396: */ 397: long lastTime = (timed)? System.nanoTime() : 0; 398: Thread w = Thread.currentThread(); 399: SNode h = head; 400: int spins = (shouldSpin(s)? 401: (timed? maxTimedSpins : maxUntimedSpins) : 0); 402: for (;;) { 403: if (w.isInterrupted()) 404: s.tryCancel(); 405: SNode m = s.match; 406: if (m != null) 407: return m; 408: if (timed) { 409: long now = System.nanoTime(); 410: nanos -= now - lastTime; 411: lastTime = now; 412: if (nanos <= 0) { 413: s.tryCancel(); 414: continue; 415: } 416: } 417: if (spins > 0) 418: spins = shouldSpin(s)? (spins-1) : 0; 419: else if (s.waiter == null) 420: s.waiter = w; // establish waiter so can park next iter 421: else if (!timed) 422: LockSupport.park(this); 423: else if (nanos > spinForTimeoutThreshold) 424: LockSupport.parkNanos(this, nanos); 425: } 426: } 427: 428: /** 429: * Returns true if node s is at head or there is an active 430: * fulfiller. 431: */ 432: boolean shouldSpin(SNode s) { 433: SNode h = head; 434: return (h == s || h == null || isFulfilling(h.mode)); 435: } 436: 437: /** 438: * Unlinks s from the stack. 439: */ 440: void clean(SNode s) { 441: s.item = null; // forget item 442: s.waiter = null; // forget thread 443: 444: /* 445: * At worst we may need to traverse entire stack to unlink 446: * s. If there are multiple concurrent calls to clean, we 447: * might not see s if another thread has already removed 448: * it. But we can stop when we see any node known to 449: * follow s. We use s.next unless it too is cancelled, in 450: * which case we try the node one past. We don't check any 451: * further because we don't want to doubly traverse just to 452: * find sentinel. 453: */ 454: 455: SNode past = s.next; 456: if (past != null && past.isCancelled()) 457: past = past.next; 458: 459: // Absorb cancelled nodes at head 460: SNode p; 461: while ((p = head) != null && p != past && p.isCancelled()) 462: casHead(p, p.next); 463: 464: // Unsplice embedded nodes 465: while (p != null && p != past) { 466: SNode n = p.next; 467: if (n != null && n.isCancelled()) 468: p.casNext(n, n.next); 469: else 470: p = n; 471: } 472: } 473: } 474: 475: /** Dual Queue */ 476: static final class TransferQueue extends Transferer { 477: /* 478: * This extends Scherer-Scott dual queue algorithm, differing, 479: * among other ways, by using modes within nodes rather than 480: * marked pointers. The algorithm is a little simpler than 481: * that for stacks because fulfillers do not need explicit 482: * nodes, and matching is done by CAS'ing QNode.item field 483: * from non-null to null (for put) or vice versa (for take). 484: */ 485: 486: /** Node class for TransferQueue. */ 487: static final class QNode { 488: volatile QNode next; // next node in queue 489: volatile Object item; // CAS'ed to or from null 490: volatile Thread waiter; // to control park/unpark 491: final boolean isData; 492: 493: QNode(Object item, boolean isData) { 494: this.item = item; 495: this.isData = isData; 496: } 497: 498: static final AtomicReferenceFieldUpdater<QNode, QNode> 499: nextUpdater = AtomicReferenceFieldUpdater.newUpdater 500: (QNode.class, QNode.class, "next"); 501: 502: boolean casNext(QNode cmp, QNode val) { 503: return (next == cmp && 504: nextUpdater.compareAndSet(this, cmp, val)); 505: } 506: 507: static final AtomicReferenceFieldUpdater<QNode, Object> 508: itemUpdater = AtomicReferenceFieldUpdater.newUpdater 509: (QNode.class, Object.class, "item"); 510: 511: boolean casItem(Object cmp, Object val) { 512: return (item == cmp && 513: itemUpdater.compareAndSet(this, cmp, val)); 514: } 515: 516: /** 517: * Tries to cancel by CAS'ing ref to this as item. 518: */ 519: void tryCancel(Object cmp) { 520: itemUpdater.compareAndSet(this, cmp, this); 521: } 522: 523: boolean isCancelled() { 524: return item == this; 525: } 526: 527: /** 528: * Returns true if this node is known to be off the queue 529: * because its next pointer has been forgotten due to 530: * an advanceHead operation. 531: */ 532: boolean isOffList() { 533: return next == this; 534: } 535: } 536: 537: /** Head of queue */ 538: transient volatile QNode head; 539: /** Tail of queue */ 540: transient volatile QNode tail; 541: /** 542: * Reference to a cancelled node that might not yet have been 543: * unlinked from queue because it was the last inserted node 544: * when it cancelled. 545: */ 546: transient volatile QNode cleanMe; 547: 548: TransferQueue() { 549: QNode h = new QNode(null, false); // initialize to dummy node. 550: head = h; 551: tail = h; 552: } 553: 554: static final AtomicReferenceFieldUpdater<TransferQueue, QNode> 555: headUpdater = AtomicReferenceFieldUpdater.newUpdater 556: (TransferQueue.class, QNode.class, "head"); 557: 558: /** 559: * Tries to cas nh as new head; if successful, unlink 560: * old head's next node to avoid garbage retention. 561: */ 562: void advanceHead(QNode h, QNode nh) { 563: if (h == head && headUpdater.compareAndSet(this, h, nh)) 564: h.next = h; // forget old next 565: } 566: 567: static final AtomicReferenceFieldUpdater<TransferQueue, QNode> 568: tailUpdater = AtomicReferenceFieldUpdater.newUpdater 569: (TransferQueue.class, QNode.class, "tail"); 570: 571: /** 572: * Tries to cas nt as new tail. 573: */ 574: void advanceTail(QNode t, QNode nt) { 575: if (tail == t) 576: tailUpdater.compareAndSet(this, t, nt); 577: } 578: 579: static final AtomicReferenceFieldUpdater<TransferQueue, QNode> 580: cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater 581: (TransferQueue.class, QNode.class, "cleanMe"); 582: 583: /** 584: * Tries to CAS cleanMe slot. 585: */ 586: boolean casCleanMe(QNode cmp, QNode val) { 587: return (cleanMe == cmp && 588: cleanMeUpdater.compareAndSet(this, cmp, val)); 589: } 590: 591: /** 592: * Puts or takes an item. 593: */ 594: Object transfer(Object e, boolean timed, long nanos) { 595: /* Basic algorithm is to loop trying to take either of 596: * two actions: 597: * 598: * 1. If queue apparently empty or holding same-mode nodes, 599: * try to add node to queue of waiters, wait to be 600: * fulfilled (or cancelled) and return matching item. 601: * 602: * 2. If queue apparently contains waiting items, and this 603: * call is of complementary mode, try to fulfill by CAS'ing 604: * item field of waiting node and dequeuing it, and then 605: * returning matching item. 606: * 607: * In each case, along the way, check for and try to help 608: * advance head and tail on behalf of other stalled/slow 609: * threads. 610: * 611: * The loop starts off with a null check guarding against 612: * seeing uninitialized head or tail values. This never 613: * happens in current SynchronousQueue, but could if 614: * callers held non-volatile/final ref to the 615: * transferer. The check is here anyway because it places 616: * null checks at top of loop, which is usually faster 617: * than having them implicitly interspersed. 618: */ 619: 620: QNode s = null; // constructed/reused as needed 621: boolean isData = (e != null); 622: 623: for (;;) { 624: QNode t = tail; 625: QNode h = head; 626: if (t == null || h == null) // saw uninitialized value 627: continue; // spin 628: 629: if (h == t || t.isData == isData) { // empty or same-mode 630: QNode tn = t.next; 631: if (t != tail) // inconsistent read 632: continue; 633: if (tn != null) { // lagging tail 634: advanceTail(t, tn); 635: continue; 636: } 637: if (timed && nanos <= 0) // can't wait 638: return null; 639: if (s == null) 640: s = new QNode(e, isData); 641: if (!t.casNext(null, s)) // failed to link in 642: continue; 643: 644: advanceTail(t, s); // swing tail and wait 645: Object x = awaitFulfill(s, e, timed, nanos); 646: if (x == s) { // wait was cancelled 647: clean(t, s); 648: return null; 649: } 650: 651: if (!s.isOffList()) { // not already unlinked 652: advanceHead(t, s); // unlink if head 653: if (x != null) // and forget fields 654: s.item = s; 655: s.waiter = null; 656: } 657: return (x != null)? x : e; 658: 659: } else { // complementary-mode 660: QNode m = h.next; // node to fulfill 661: if (t != tail || m == null || h != head) 662: continue; // inconsistent read 663: 664: Object x = m.item; 665: if (isData == (x != null) || // m already fulfilled 666: x == m || // m cancelled 667: !m.casItem(x, e)) { // lost CAS 668: advanceHead(h, m); // dequeue and retry 669: continue; 670: } 671: 672: advanceHead(h, m); // successfully fulfilled 673: LockSupport.unpark(m.waiter); 674: return (x != null)? x : e; 675: } 676: } 677: } 678: 679: /** 680: * Spins/blocks until node s is fulfilled. 681: * 682: * @param s the waiting node 683: * @param e the comparison value for checking match 684: * @param timed true if timed wait 685: * @param nanos timeout value 686: * @return matched item, or s if cancelled 687: */ 688: Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { 689: /* Same idea as TransferStack.awaitFulfill */ 690: long lastTime = (timed)? System.nanoTime() : 0; 691: Thread w = Thread.currentThread(); 692: int spins = ((head.next == s) ? 693: (timed? maxTimedSpins : maxUntimedSpins) : 0); 694: for (;;) { 695: if (w.isInterrupted()) 696: s.tryCancel(e); 697: Object x = s.item; 698: if (x != e) 699: return x; 700: if (timed) { 701: long now = System.nanoTime(); 702: nanos -= now - lastTime; 703: lastTime = now; 704: if (nanos <= 0) { 705: s.tryCancel(e); 706: continue; 707: } 708: } 709: if (spins > 0) 710: --spins; 711: else if (s.waiter == null) 712: s.waiter = w; 713: else if (!timed) 714: LockSupport.park(this); 715: else if (nanos > spinForTimeoutThreshold) 716: LockSupport.parkNanos(this, nanos); 717: } 718: } 719: 720: /** 721: * Gets rid of cancelled node s with original predecessor pred. 722: */ 723: void clean(QNode pred, QNode s) { 724: s.waiter = null; // forget thread 725: /* 726: * At any given time, exactly one node on list cannot be 727: * deleted -- the last inserted node. To accommodate this, 728: * if we cannot delete s, we save its predecessor as 729: * "cleanMe", deleting the previously saved version 730: * first. At least one of node s or the node previously 731: * saved can always be deleted, so this always terminates. 732: */ 733: while (pred.next == s) { // Return early if already unlinked 734: QNode h = head; 735: QNode hn = h.next; // Absorb cancelled first node as head 736: if (hn != null && hn.isCancelled()) { 737: advanceHead(h, hn); 738: continue; 739: } 740: QNode t = tail; // Ensure consistent read for tail 741: if (t == h) 742: return; 743: QNode tn = t.next; 744: if (t != tail) 745: continue; 746: if (tn != null) { 747: advanceTail(t, tn); 748: continue; 749: } 750: if (s != t) { // If not tail, try to unsplice 751: QNode sn = s.next; 752: if (sn == s || pred.casNext(s, sn)) 753: return; 754: } 755: QNode dp = cleanMe; 756: if (dp != null) { // Try unlinking previous cancelled node 757: QNode d = dp.next; 758: QNode dn; 759: if (d == null || // d is gone or 760: d == dp || // d is off list or 761: !d.isCancelled() || // d not cancelled or 762: (d != t && // d not tail and 763: (dn = d.next) != null && // has successor 764: dn != d && // that is on list 765: dp.casNext(d, dn))) // d unspliced 766: casCleanMe(dp, null); 767: if (dp == pred) 768: return; // s is already saved node 769: } else if (casCleanMe(null, pred)) 770: return; // Postpone cleaning s 771: } 772: } 773: } 774: 775: /** 776: * The transferer. Set only in constructor, but cannot be declared 777: * as final without further complicating serialization. Since 778: * this is accessed only at most once per public method, there 779: * isn't a noticeable performance penalty for using volatile 780: * instead of final here. 781: */ 782: private transient volatile Transferer transferer; 783: 784: /** 785: * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. 786: */ 787: public SynchronousQueue() { 788: this(false); 789: } 790: 791: /** 792: * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. 793: * 794: * @param fair if true, waiting threads contend in FIFO order for 795: * access; otherwise the order is unspecified. 796: */ 797: public SynchronousQueue(boolean fair) { 798: transferer = (fair)? new TransferQueue() : new TransferStack(); 799: } 800: 801: /** 802: * Adds the specified element to this queue, waiting if necessary for 803: * another thread to receive it. 804: * 805: * @throws InterruptedException {@inheritDoc} 806: * @throws NullPointerException {@inheritDoc} 807: */ 808: public void put(E o) throws InterruptedException { 809: if (o == null) throw new NullPointerException(); 810: if (transferer.transfer(o, false, 0) == null) { 811: Thread.interrupted(); 812: throw new InterruptedException(); 813: } 814: } 815: 816: /** 817: * Inserts the specified element into this queue, waiting if necessary 818: * up to the specified wait time for another thread to receive it. 819: * 820: * @return <tt>true</tt> if successful, or <tt>false</tt> if the 821: * specified waiting time elapses before a consumer appears. 822: * @throws InterruptedException {@inheritDoc} 823: * @throws NullPointerException {@inheritDoc} 824: */ 825: public boolean offer(E o, long timeout, TimeUnit unit) 826: throws InterruptedException { 827: if (o == null) throw new NullPointerException(); 828: if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) 829: return true; 830: if (!Thread.interrupted()) 831: return false; 832: throw new InterruptedException(); 833: } 834: 835: /** 836: * Inserts the specified element into this queue, if another thread is 837: * waiting to receive it. 838: * 839: * @param e the element to add 840: * @return <tt>true</tt> if the element was added to this queue, else 841: * <tt>false</tt> 842: * @throws NullPointerException if the specified element is null 843: */ 844: public boolean offer(E e) { 845: if (e == null) throw new NullPointerException(); 846: return transferer.transfer(e, true, 0) != null; 847: } 848: 849: /** 850: * Retrieves and removes the head of this queue, waiting if necessary 851: * for another thread to insert it. 852: * 853: * @return the head of this queue 854: * @throws InterruptedException {@inheritDoc} 855: */ 856: public E take() throws InterruptedException { 857: Object e = transferer.transfer(null, false, 0); 858: if (e != null) 859: return (E)e; 860: Thread.interrupted(); 861: throw new InterruptedException(); 862: } 863: 864: /** 865: * Retrieves and removes the head of this queue, waiting 866: * if necessary up to the specified wait time, for another thread 867: * to insert it. 868: * 869: * @return the head of this queue, or <tt>null</tt> if the 870: * specified waiting time elapses before an element is present. 871: * @throws InterruptedException {@inheritDoc} 872: */ 873: public E poll(long timeout, TimeUnit unit) throws InterruptedException { 874: Object e = transferer.transfer(null, true, unit.toNanos(timeout)); 875: if (e != null || !Thread.interrupted()) 876: return (E)e; 877: throw new InterruptedException(); 878: } 879: 880: /** 881: * Retrieves and removes the head of this queue, if another thread 882: * is currently making an element available. 883: * 884: * @return the head of this queue, or <tt>null</tt> if no 885: * element is available. 886: */ 887: public E poll() { 888: return (E)transferer.transfer(null, true, 0); 889: } 890: 891: /** 892: * Always returns <tt>true</tt>. 893: * A <tt>SynchronousQueue</tt> has no internal capacity. 894: * 895: * @return <tt>true</tt> 896: */ 897: public boolean isEmpty() { 898: return true; 899: } 900: 901: /** 902: * Always returns zero. 903: * A <tt>SynchronousQueue</tt> has no internal capacity. 904: * 905: * @return zero. 906: */ 907: public int size() { 908: return 0; 909: } 910: 911: /** 912: * Always returns zero. 913: * A <tt>SynchronousQueue</tt> has no internal capacity. 914: * 915: * @return zero. 916: */ 917: public int remainingCapacity() { 918: return 0; 919: } 920: 921: /** 922: * Does nothing. 923: * A <tt>SynchronousQueue</tt> has no internal capacity. 924: */ 925: public void clear() { 926: } 927: 928: /** 929: * Always returns <tt>false</tt>. 930: * A <tt>SynchronousQueue</tt> has no internal capacity. 931: * 932: * @param o the element 933: * @return <tt>false</tt> 934: */ 935: public boolean contains(Object o) { 936: return false; 937: } 938: 939: /** 940: * Always returns <tt>false</tt>. 941: * A <tt>SynchronousQueue</tt> has no internal capacity. 942: * 943: * @param o the element to remove 944: * @return <tt>false</tt> 945: */ 946: public boolean remove(Object o) { 947: return false; 948: } 949: 950: /** 951: * Returns <tt>false</tt> unless the given collection is empty. 952: * A <tt>SynchronousQueue</tt> has no internal capacity. 953: * 954: * @param c the collection 955: * @return <tt>false</tt> unless given collection is empty 956: */ 957: public boolean containsAll(Collection<?> c) { 958: return c.isEmpty(); 959: } 960: 961: /** 962: * Always returns <tt>false</tt>. 963: * A <tt>SynchronousQueue</tt> has no internal capacity. 964: * 965: * @param c the collection 966: * @return <tt>false</tt> 967: */ 968: public boolean removeAll(Collection<?> c) { 969: return false; 970: } 971: 972: /** 973: * Always returns <tt>false</tt>. 974: * A <tt>SynchronousQueue</tt> has no internal capacity. 975: * 976: * @param c the collection 977: * @return <tt>false</tt> 978: */ 979: public boolean retainAll(Collection<?> c) { 980: return false; 981: } 982: 983: /** 984: * Always returns <tt>null</tt>. 985: * A <tt>SynchronousQueue</tt> does not return elements 986: * unless actively waited on. 987: * 988: * @return <tt>null</tt> 989: */ 990: public E peek() { 991: return null; 992: } 993: 994: static class EmptyIterator<E> implements Iterator<E> { 995: public boolean hasNext() { 996: return false; 997: } 998: public E next() { 999: throw new NoSuchElementException(); 1000: } 1001: public void remove() { 1002: throw new IllegalStateException(); 1003: } 1004: } 1005: 1006: /** 1007: * Returns an empty iterator in which <tt>hasNext</tt> always returns 1008: * <tt>false</tt>. 1009: * 1010: * @return an empty iterator 1011: */ 1012: public Iterator<E> iterator() { 1013: return new EmptyIterator<E>(); 1014: } 1015: 1016: /** 1017: * Returns a zero-length array. 1018: * @return a zero-length array 1019: */ 1020: public Object[] toArray() { 1021: return new Object[0]; 1022: } 1023: 1024: /** 1025: * Sets the zeroeth element of the specified array to <tt>null</tt> 1026: * (if the array has non-zero length) and returns it. 1027: * 1028: * @param a the array 1029: * @return the specified array 1030: * @throws NullPointerException if the specified array is null 1031: */ 1032: public <T> T[] toArray(T[] a) { 1033: if (a.length > 0) 1034: a[0] = null; 1035: return a; 1036: } 1037: 1038: /** 1039: * @throws UnsupportedOperationException {@inheritDoc} 1040: * @throws ClassCastException {@inheritDoc} 1041: * @throws NullPointerException {@inheritDoc} 1042: * @throws IllegalArgumentException {@inheritDoc} 1043: */ 1044: public int drainTo(Collection<? super E> c) { 1045: if (c == null) 1046: throw new NullPointerException(); 1047: if (c == this) 1048: throw new IllegalArgumentException(); 1049: int n = 0; 1050: E e; 1051: while ( (e = poll()) != null) { 1052: c.add(e); 1053: ++n; 1054: } 1055: return n; 1056: } 1057: 1058: /** 1059: * @throws UnsupportedOperationException {@inheritDoc} 1060: * @throws ClassCastException {@inheritDoc} 1061: * @throws NullPointerException {@inheritDoc} 1062: * @throws IllegalArgumentException {@inheritDoc} 1063: */ 1064: public int drainTo(Collection<? super E> c, int maxElements) { 1065: if (c == null) 1066: throw new NullPointerException(); 1067: if (c == this) 1068: throw new IllegalArgumentException(); 1069: int n = 0; 1070: E e; 1071: while (n < maxElements && (e = poll()) != null) { 1072: c.add(e); 1073: ++n; 1074: } 1075: return n; 1076: } 1077: 1078: /* 1079: * To cope with serialization strategy in the 1.5 version of 1080: * SynchronousQueue, we declare some unused classes and fields 1081: * that exist solely to enable serializability across versions. 1082: * These fields are never used, so are initialized only if this 1083: * object is ever serialized or deserialized. 1084: */ 1085: 1086: static class WaitQueue implements java.io.Serializable { } 1087: static class LifoWaitQueue extends WaitQueue { 1088: private static final long serialVersionUID = -3633113410248163686L; 1089: } 1090: static class FifoWaitQueue extends WaitQueue { 1091: private static final long serialVersionUID = -3623113410248163686L; 1092: } 1093: private ReentrantLock qlock; 1094: private WaitQueue waitingProducers; 1095: private WaitQueue waitingConsumers; 1096: 1097: /** 1098: * Save the state to a stream (that is, serialize it). 1099: * 1100: * @param s the stream 1101: */ 1102: private void writeObject(java.io.ObjectOutputStream s) 1103: throws java.io.IOException { 1104: boolean fair = transferer instanceof TransferQueue; 1105: if (fair) { 1106: qlock = new ReentrantLock(true); 1107: waitingProducers = new FifoWaitQueue(); 1108: waitingConsumers = new FifoWaitQueue(); 1109: } 1110: else { 1111: qlock = new ReentrantLock(); 1112: waitingProducers = new LifoWaitQueue(); 1113: waitingConsumers = new LifoWaitQueue(); 1114: } 1115: s.defaultWriteObject(); 1116: } 1117: 1118: private void readObject(final java.io.ObjectInputStream s) 1119: throws java.io.IOException, ClassNotFoundException { 1120: s.defaultReadObject(); 1121: if (waitingProducers instanceof FifoWaitQueue) 1122: transferer = new TransferQueue(); 1123: else 1124: transferer = new TransferStack(); 1125: } 1126: 1127: }
GNU Classpath (0.99.1-pre) |