Source for java.util.concurrent.SynchronousQueue

   1: /*
   2:  * Written by Doug Lea, Bill Scherer, and Michael Scott with
   3:  * assistance from members of JCP JSR-166 Expert Group and released to
   4:  * the public domain, as explained at
   5:  * http://creativecommons.org/licenses/publicdomain
   6:  */
   7: 
   8: package java.util.concurrent;
   9: import java.util.concurrent.locks.*;
  10: import java.util.concurrent.atomic.*;
  11: import java.util.*;
  12: 
  13: /**
  14:  * A {@linkplain BlockingQueue blocking queue} in which each insert
  15:  * operation must wait for a corresponding remove operation by another
  16:  * thread, and vice versa.  A synchronous queue does not have any
  17:  * internal capacity, not even a capacity of one.  You cannot
  18:  * <tt>peek</tt> at a synchronous queue because an element is only
  19:  * present when you try to remove it; you cannot insert an element
  20:  * (using any method) unless another thread is trying to remove it;
  21:  * you cannot iterate as there is nothing to iterate.  The
  22:  * <em>head</em> of the queue is the element that the first queued
  23:  * inserting thread is trying to add to the queue; if there is no such
  24:  * queued thread then no element is available for removal and
  25:  * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
  26:  * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
  27:  * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
  28:  * does not permit <tt>null</tt> elements.
  29:  *
  30:  * <p>Synchronous queues are similar to rendezvous channels used in
  31:  * CSP and Ada. They are well suited for handoff designs, in which an
  32:  * object running in one thread must sync up with an object running
  33:  * in another thread in order to hand it some information, event, or
  34:  * task.
  35:  *
  36:  * <p> This class supports an optional fairness policy for ordering
  37:  * waiting producer and consumer threads.  By default, this ordering
  38:  * is not guaranteed. However, a queue constructed with fairness set
  39:  * to <tt>true</tt> grants threads access in FIFO order.
  40:  *
  41:  * <p>This class and its iterator implement all of the
  42:  * <em>optional</em> methods of the {@link Collection} and {@link
  43:  * Iterator} interfaces.
  44:  *
  45:  * <p>This class is a member of the
  46:  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  47:  * Java Collections Framework</a>.
  48:  *
  49:  * @since 1.5
  50:  * @author Doug Lea and Bill Scherer and Michael Scott
  51:  * @param <E> the type of elements held in this collection
  52:  */
  53: public class SynchronousQueue<E> extends AbstractQueue<E>
  54:     implements BlockingQueue<E>, java.io.Serializable {
  55:     private static final long serialVersionUID = -3223113410248163686L;
  56: 
  57:     /*
  58:      * This class implements extensions of the dual stack and dual
  59:      * queue algorithms described in "Nonblocking Concurrent Objects
  60:      * with Condition Synchronization", by W. N. Scherer III and
  61:      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
  62:      * Oct. 2004 (see also
  63:      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
  64:      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
  65:      * queue for fair mode. The performance of the two is generally
  66:      * similar. Fifo usually supports higher throughput under
  67:      * contention but Lifo maintains higher thread locality in common
  68:      * applications.
  69:      *
  70:      * A dual queue (and similarly stack) is one that at any given
  71:      * time either holds "data" -- items provided by put operations,
  72:      * or "requests" -- slots representing take operations, or is
  73:      * empty. A call to "fulfill" (i.e., a call requesting an item
  74:      * from a queue holding data or vice versa) dequeues a
  75:      * complementary node.  The most interesting feature of these
  76:      * queues is that any operation can figure out which mode the
  77:      * queue is in, and act accordingly without needing locks.
  78:      *
  79:      * Both the queue and stack extend abstract class Transferer
  80:      * defining the single method transfer that does a put or a
  81:      * take. These are unified into a single method because in dual
  82:      * data structures, the put and take operations are symmetrical,
  83:      * so nearly all code can be combined. The resulting transfer
  84:      * methods are on the long side, but are easier to follow than
  85:      * they would be if broken up into nearly-duplicated parts.
  86:      *
  87:      * The queue and stack data structures share many conceptual
  88:      * similarities but very few concrete details. For simplicity,
  89:      * they are kept distinct so that they can later evolve
  90:      * separately.
  91:      *
  92:      * The algorithms here differ from the versions in the above paper
  93:      * in extending them for use in synchronous queues, as well as
  94:      * dealing with cancellation. The main differences include:
  95:      *
  96:      *  1. The original algorithms used bit-marked pointers, but
  97:      *     the ones here use mode bits in nodes, leading to a number
  98:      *     of further adaptations.
  99:      *  2. SynchronousQueues must block threads waiting to become
 100:      *     fulfilled.
 101:      *  3. Support for cancellation via timeout and interrupts,
 102:      *     including cleaning out cancelled nodes/threads
 103:      *     from lists to avoid garbage retention and memory depletion.
 104:      *
 105:      * Blocking is mainly accomplished using LockSupport park/unpark,
 106:      * except that nodes that appear to be the next ones to become
 107:      * fulfilled first spin a bit (on multiprocessors only). On very
 108:      * busy synchronous queues, spinning can dramatically improve
 109:      * throughput. And on less busy ones, the amount of spinning is
 110:      * small enough not to be noticeable.
 111:      *
 112:      * Cleaning is done in different ways in queues vs stacks.  For
 113:      * queues, we can almost always remove a node immediately in O(1)
 114:      * time (modulo retries for consistency checks) when it is
 115:      * cancelled. But if it may be pinned as the current tail, it must
 116:      * wait until some subsequent cancellation. For stacks, we need a
 117:      * potentially O(n) traversal to be sure that we can remove the
 118:      * node, but this can run concurrently with other threads
 119:      * accessing the stack.
 120:      *
 121:      * While garbage collection takes care of most node reclamation
 122:      * issues that otherwise complicate nonblocking algorithms, care
 123:      * is taken to "forget" references to data, other nodes, and
 124:      * threads that might be held on to long-term by blocked
 125:      * threads. In cases where setting to null would otherwise
 126:      * conflict with main algorithms, this is done by changing a
 127:      * node's link to now point to the node itself. This doesn't arise
 128:      * much for Stack nodes (because blocked threads do not hang on to
 129:      * old head pointers), but references in Queue nodes must be
 130:      * aggressively forgotten to avoid reachability of everything any
 131:      * node has ever referred to since arrival.
 132:      */
 133: 
 134:     /**
 135:      * Shared internal API for dual stacks and queues.
 136:      */
 137:     static abstract class Transferer {
 138:         /**
 139:          * Performs a put or take.
 140:          *
 141:          * @param e if non-null, the item to be handed to a consumer;
 142:          *          if null, requests that transfer return an item
 143:          *          offered by producer.
 144:          * @param timed if this operation should timeout
 145:          * @param nanos the timeout, in nanoseconds
 146:          * @return if non-null, the item provided or received; if null,
 147:          *         the operation failed due to timeout or interrupt --
 148:          *         the caller can distinguish which of these occurred
 149:          *         by checking Thread.interrupted.
 150:          */
 151:         abstract Object transfer(Object e, boolean timed, long nanos);
 152:     }
 153: 
 154:     /** The number of CPUs, for spin control */
 155:     static final int NCPUS = Runtime.getRuntime().availableProcessors();
 156: 
 157:     /**
 158:      * The number of times to spin before blocking in timed waits.
 159:      * The value is empirically derived -- it works well across a
 160:      * variety of processors and OSes. Empirically, the best value
 161:      * seems not to vary with number of CPUs (beyond 2) so is just
 162:      * a constant.
 163:      */
 164:     static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
 165: 
 166:     /**
 167:      * The number of times to spin before blocking in untimed waits.
 168:      * This is greater than timed value because untimed waits spin
 169:      * faster since they don't need to check times on each spin.
 170:      */
 171:     static final int maxUntimedSpins = maxTimedSpins * 16;
 172: 
 173:     /**
 174:      * The number of nanoseconds for which it is faster to spin
 175:      * rather than to use timed park. A rough estimate suffices.
 176:      */
 177:     static final long spinForTimeoutThreshold = 1000L;
 178: 
 179:     /** Dual stack */
 180:     static final class TransferStack extends Transferer {
 181:         /*
 182:          * This extends Scherer-Scott dual stack algorithm, differing,
 183:          * among other ways, by using "covering" nodes rather than
 184:          * bit-marked pointers: Fulfilling operations push on marker
 185:          * nodes (with FULFILLING bit set in mode) to reserve a spot
 186:          * to match a waiting node.
 187:          */
 188: 
 189:         /* Modes for SNodes, ORed together in node fields */
 190:         /** Node represents an unfulfilled consumer */
 191:         static final int REQUEST    = 0;
 192:         /** Node represents an unfulfilled producer */
 193:         static final int DATA       = 1;
 194:         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 195:         static final int FULFILLING = 2;
 196: 
 197:         /** Return true if m has fulfilling bit set */
 198:         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 199: 
 200:         /** Node class for TransferStacks. */
 201:         static final class SNode {
 202:             volatile SNode next;        // next node in stack
 203:             volatile SNode match;       // the node matched to this
 204:             volatile Thread waiter;     // to control park/unpark
 205:             Object item;                // data; or null for REQUESTs
 206:             int mode;
 207:             // Note: item and mode fields don't need to be volatile
 208:             // since they are always written before, and read after,
 209:             // other volatile/atomic operations.
 210: 
 211:             SNode(Object item) {
 212:                 this.item = item;
 213:             }
 214: 
 215:             static final AtomicReferenceFieldUpdater<SNode, SNode>
 216:                 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
 217:                 (SNode.class, SNode.class, "next");
 218: 
 219:             boolean casNext(SNode cmp, SNode val) {
 220:                 return (cmp == next &&
 221:                         nextUpdater.compareAndSet(this, cmp, val));
 222:             }
 223: 
 224:             static final AtomicReferenceFieldUpdater<SNode, SNode>
 225:                 matchUpdater = AtomicReferenceFieldUpdater.newUpdater
 226:                 (SNode.class, SNode.class, "match");
 227: 
 228:             /**
 229:              * Tries to match node s to this node, if so, waking up thread.
 230:              * Fulfillers call tryMatch to identify their waiters.
 231:              * Waiters block until they have been matched.
 232:              *
 233:              * @param s the node to match
 234:              * @return true if successfully matched to s
 235:              */
 236:             boolean tryMatch(SNode s) {
 237:                 if (match == null &&
 238:                     matchUpdater.compareAndSet(this, null, s)) {
 239:                     Thread w = waiter;
 240:                     if (w != null) {    // waiters need at most one unpark
 241:                         waiter = null;
 242:                         LockSupport.unpark(w);
 243:                     }
 244:                     return true;
 245:                 }
 246:                 return match == s;
 247:             }
 248: 
 249:             /**
 250:              * Tries to cancel a wait by matching node to itself.
 251:              */
 252:             void tryCancel() {
 253:                 matchUpdater.compareAndSet(this, null, this);
 254:             }
 255: 
 256:             boolean isCancelled() {
 257:                 return match == this;
 258:             }
 259:         }
 260: 
 261:         /** The head (top) of the stack */
 262:         volatile SNode head;
 263: 
 264:         static final AtomicReferenceFieldUpdater<TransferStack, SNode>
 265:             headUpdater = AtomicReferenceFieldUpdater.newUpdater
 266:             (TransferStack.class,  SNode.class, "head");
 267: 
 268:         boolean casHead(SNode h, SNode nh) {
 269:             return h == head && headUpdater.compareAndSet(this, h, nh);
 270:         }
 271: 
 272:         /**
 273:          * Creates or resets fields of a node. Called only from transfer
 274:          * where the node to push on stack is lazily created and
 275:          * reused when possible to help reduce intervals between reads
 276:          * and CASes of head and to avoid surges of garbage when CASes
 277:          * to push nodes fail due to contention.
 278:          */
 279:         static SNode snode(SNode s, Object e, SNode next, int mode) {
 280:             if (s == null) s = new SNode(e);
 281:             s.mode = mode;
 282:             s.next = next;
 283:             return s;
 284:         }
 285: 
 286:         /**
 287:          * Puts or takes an item.
 288:          */
 289:         Object transfer(Object e, boolean timed, long nanos) {
 290:             /*
 291:              * Basic algorithm is to loop trying one of three actions:
 292:              *
 293:              * 1. If apparently empty or already containing nodes of same
 294:              *    mode, try to push node on stack and wait for a match,
 295:              *    returning it, or null if cancelled.
 296:              *
 297:              * 2. If apparently containing node of complementary mode,
 298:              *    try to push a fulfilling node on to stack, match
 299:              *    with corresponding waiting node, pop both from
 300:              *    stack, and return matched item. The matching or
 301:              *    unlinking might not actually be necessary because of
 302:              *    other threads performing action 3:
 303:              *
 304:              * 3. If top of stack already holds another fulfilling node,
 305:              *    help it out by doing its match and/or pop
 306:              *    operations, and then continue. The code for helping
 307:              *    is essentially the same as for fulfilling, except
 308:              *    that it doesn't return the item.
 309:              */
 310: 
 311:             SNode s = null; // constructed/reused as needed
 312:             int mode = (e == null)? REQUEST : DATA;
 313: 
 314:             for (;;) {
 315:                 SNode h = head;
 316:                 if (h == null || h.mode == mode) {  // empty or same-mode
 317:                     if (timed && nanos <= 0) {      // can't wait
 318:                         if (h != null && h.isCancelled())
 319:                             casHead(h, h.next);     // pop cancelled node
 320:                         else
 321:                             return null;
 322:                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 323:                         SNode m = awaitFulfill(s, timed, nanos);
 324:                         if (m == s) {               // wait was cancelled
 325:                             clean(s);
 326:                             return null;
 327:                         }
 328:                         if ((h = head) != null && h.next == s)
 329:                             casHead(h, s.next);     // help s's fulfiller
 330:                         return mode == REQUEST? m.item : s.item;
 331:                     }
 332:                 } else if (!isFulfilling(h.mode)) { // try to fulfill
 333:                     if (h.isCancelled())            // already cancelled
 334:                         casHead(h, h.next);         // pop and retry
 335:                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 336:                         for (;;) { // loop until matched or waiters disappear
 337:                             SNode m = s.next;       // m is s's match
 338:                             if (m == null) {        // all waiters are gone
 339:                                 casHead(s, null);   // pop fulfill node
 340:                                 s = null;           // use new node next time
 341:                                 break;              // restart main loop
 342:                             }
 343:                             SNode mn = m.next;
 344:                             if (m.tryMatch(s)) {
 345:                                 casHead(s, mn);     // pop both s and m
 346:                                 return (mode == REQUEST)? m.item : s.item;
 347:                             } else                  // lost match
 348:                                 s.casNext(m, mn);   // help unlink
 349:                         }
 350:                     }
 351:                 } else {                            // help a fulfiller
 352:                     SNode m = h.next;               // m is h's match
 353:                     if (m == null)                  // waiter is gone
 354:                         casHead(h, null);           // pop fulfilling node
 355:                     else {
 356:                         SNode mn = m.next;
 357:                         if (m.tryMatch(h))          // help match
 358:                             casHead(h, mn);         // pop both h and m
 359:                         else                        // lost match
 360:                             h.casNext(m, mn);       // help unlink
 361:                     }
 362:                 }
 363:             }
 364:         }
 365: 
 366:         /**
 367:          * Spins/blocks until node s is matched by a fulfill operation.
 368:          *
 369:          * @param s the waiting node
 370:          * @param timed true if timed wait
 371:          * @param nanos timeout value
 372:          * @return matched node, or s if cancelled
 373:          */
 374:         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
 375:             /*
 376:              * When a node/thread is about to block, it sets its waiter
 377:              * field and then rechecks state at least one more time
 378:              * before actually parking, thus covering race vs
 379:              * fulfiller noticing that waiter is non-null so should be
 380:              * woken.
 381:              *
 382:              * When invoked by nodes that appear at the point of call
 383:              * to be at the head of the stack, calls to park are
 384:              * preceded by spins to avoid blocking when producers and
 385:              * consumers are arriving very close in time.  This can
 386:              * happen enough to bother only on multiprocessors.
 387:              *
 388:              * The order of checks for returning out of main loop
 389:              * reflects fact that interrupts have precedence over
 390:              * normal returns, which have precedence over
 391:              * timeouts. (So, on timeout, one last check for match is
 392:              * done before giving up.) Except that calls from untimed
 393:              * SynchronousQueue.{poll/offer} don't check interrupts
 394:              * and don't wait at all, so are trapped in transfer
 395:              * method rather than calling awaitFulfill.
 396:              */
 397:             long lastTime = (timed)? System.nanoTime() : 0;
 398:             Thread w = Thread.currentThread();
 399:             SNode h = head;
 400:             int spins = (shouldSpin(s)?
 401:                          (timed? maxTimedSpins : maxUntimedSpins) : 0);
 402:             for (;;) {
 403:                 if (w.isInterrupted())
 404:                     s.tryCancel();
 405:                 SNode m = s.match;
 406:                 if (m != null)
 407:                     return m;
 408:                 if (timed) {
 409:                     long now = System.nanoTime();
 410:                     nanos -= now - lastTime;
 411:                     lastTime = now;
 412:                     if (nanos <= 0) {
 413:                         s.tryCancel();
 414:                         continue;
 415:                     }
 416:                 }
 417:                 if (spins > 0)
 418:                     spins = shouldSpin(s)? (spins-1) : 0;
 419:                 else if (s.waiter == null)
 420:                     s.waiter = w; // establish waiter so can park next iter
 421:                 else if (!timed)
 422:                     LockSupport.park(this);
 423:                 else if (nanos > spinForTimeoutThreshold)
 424:                     LockSupport.parkNanos(this, nanos);
 425:             }
 426:         }
 427: 
 428:         /**
 429:          * Returns true if node s is at head or there is an active
 430:          * fulfiller.
 431:          */
 432:         boolean shouldSpin(SNode s) {
 433:             SNode h = head;
 434:             return (h == s || h == null || isFulfilling(h.mode));
 435:         }
 436: 
 437:         /**
 438:          * Unlinks s from the stack.
 439:          */
 440:         void clean(SNode s) {
 441:             s.item = null;   // forget item
 442:             s.waiter = null; // forget thread
 443: 
 444:             /*
 445:              * At worst we may need to traverse entire stack to unlink
 446:              * s. If there are multiple concurrent calls to clean, we
 447:              * might not see s if another thread has already removed
 448:              * it. But we can stop when we see any node known to
 449:              * follow s. We use s.next unless it too is cancelled, in
 450:              * which case we try the node one past. We don't check any
 451:              * further because we don't want to doubly traverse just to
 452:              * find sentinel.
 453:              */
 454: 
 455:             SNode past = s.next;
 456:             if (past != null && past.isCancelled())
 457:                 past = past.next;
 458: 
 459:             // Absorb cancelled nodes at head
 460:             SNode p;
 461:             while ((p = head) != null && p != past && p.isCancelled())
 462:                 casHead(p, p.next);
 463: 
 464:             // Unsplice embedded nodes
 465:             while (p != null && p != past) {
 466:                 SNode n = p.next;
 467:                 if (n != null && n.isCancelled())
 468:                     p.casNext(n, n.next);
 469:                 else
 470:                     p = n;
 471:             }
 472:         }
 473:     }
 474: 
 475:     /** Dual Queue */
 476:     static final class TransferQueue extends Transferer {
 477:         /*
 478:          * This extends Scherer-Scott dual queue algorithm, differing,
 479:          * among other ways, by using modes within nodes rather than
 480:          * marked pointers. The algorithm is a little simpler than
 481:          * that for stacks because fulfillers do not need explicit
 482:          * nodes, and matching is done by CAS'ing QNode.item field
 483:          * from non-null to null (for put) or vice versa (for take).
 484:          */
 485: 
 486:         /** Node class for TransferQueue. */
 487:         static final class QNode {
 488:             volatile QNode next;          // next node in queue
 489:             volatile Object item;         // CAS'ed to or from null
 490:             volatile Thread waiter;       // to control park/unpark
 491:             final boolean isData;
 492: 
 493:             QNode(Object item, boolean isData) {
 494:                 this.item = item;
 495:                 this.isData = isData;
 496:             }
 497: 
 498:             static final AtomicReferenceFieldUpdater<QNode, QNode>
 499:                 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
 500:                 (QNode.class, QNode.class, "next");
 501: 
 502:             boolean casNext(QNode cmp, QNode val) {
 503:                 return (next == cmp &&
 504:                         nextUpdater.compareAndSet(this, cmp, val));
 505:             }
 506: 
 507:             static final AtomicReferenceFieldUpdater<QNode, Object>
 508:                 itemUpdater = AtomicReferenceFieldUpdater.newUpdater
 509:                 (QNode.class, Object.class, "item");
 510: 
 511:             boolean casItem(Object cmp, Object val) {
 512:                 return (item == cmp &&
 513:                         itemUpdater.compareAndSet(this, cmp, val));
 514:             }
 515: 
 516:             /**
 517:              * Tries to cancel by CAS'ing ref to this as item.
 518:              */
 519:             void tryCancel(Object cmp) {
 520:                 itemUpdater.compareAndSet(this, cmp, this);
 521:             }
 522: 
 523:             boolean isCancelled() {
 524:                 return item == this;
 525:             }
 526: 
 527:             /**
 528:              * Returns true if this node is known to be off the queue
 529:              * because its next pointer has been forgotten due to
 530:              * an advanceHead operation.
 531:              */
 532:             boolean isOffList() {
 533:                 return next == this;
 534:             }
 535:         }
 536: 
 537:         /** Head of queue */
 538:         transient volatile QNode head;
 539:         /** Tail of queue */
 540:         transient volatile QNode tail;
 541:         /**
 542:          * Reference to a cancelled node that might not yet have been
 543:          * unlinked from queue because it was the last inserted node
 544:          * when it cancelled.
 545:          */
 546:         transient volatile QNode cleanMe;
 547: 
 548:         TransferQueue() {
 549:             QNode h = new QNode(null, false); // initialize to dummy node.
 550:             head = h;
 551:             tail = h;
 552:         }
 553: 
 554:         static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 555:             headUpdater = AtomicReferenceFieldUpdater.newUpdater
 556:             (TransferQueue.class,  QNode.class, "head");
 557: 
 558:         /**
 559:          * Tries to cas nh as new head; if successful, unlink
 560:          * old head's next node to avoid garbage retention.
 561:          */
 562:         void advanceHead(QNode h, QNode nh) {
 563:             if (h == head && headUpdater.compareAndSet(this, h, nh))
 564:                 h.next = h; // forget old next
 565:         }
 566: 
 567:         static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 568:             tailUpdater = AtomicReferenceFieldUpdater.newUpdater
 569:             (TransferQueue.class, QNode.class, "tail");
 570: 
 571:         /**
 572:          * Tries to cas nt as new tail.
 573:          */
 574:         void advanceTail(QNode t, QNode nt) {
 575:             if (tail == t)
 576:                 tailUpdater.compareAndSet(this, t, nt);
 577:         }
 578: 
 579:         static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 580:             cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
 581:             (TransferQueue.class, QNode.class, "cleanMe");
 582: 
 583:         /**
 584:          * Tries to CAS cleanMe slot.
 585:          */
 586:         boolean casCleanMe(QNode cmp, QNode val) {
 587:             return (cleanMe == cmp &&
 588:                     cleanMeUpdater.compareAndSet(this, cmp, val));
 589:         }
 590: 
 591:         /**
 592:          * Puts or takes an item.
 593:          */
 594:         Object transfer(Object e, boolean timed, long nanos) {
 595:             /* Basic algorithm is to loop trying to take either of
 596:              * two actions:
 597:              *
 598:              * 1. If queue apparently empty or holding same-mode nodes,
 599:              *    try to add node to queue of waiters, wait to be
 600:              *    fulfilled (or cancelled) and return matching item.
 601:              *
 602:              * 2. If queue apparently contains waiting items, and this
 603:              *    call is of complementary mode, try to fulfill by CAS'ing
 604:              *    item field of waiting node and dequeuing it, and then
 605:              *    returning matching item.
 606:              *
 607:              * In each case, along the way, check for and try to help
 608:              * advance head and tail on behalf of other stalled/slow
 609:              * threads.
 610:              *
 611:              * The loop starts off with a null check guarding against
 612:              * seeing uninitialized head or tail values. This never
 613:              * happens in current SynchronousQueue, but could if
 614:              * callers held non-volatile/final ref to the
 615:              * transferer. The check is here anyway because it places
 616:              * null checks at top of loop, which is usually faster
 617:              * than having them implicitly interspersed.
 618:              */
 619: 
 620:             QNode s = null; // constructed/reused as needed
 621:             boolean isData = (e != null);
 622: 
 623:             for (;;) {
 624:                 QNode t = tail;
 625:                 QNode h = head;
 626:                 if (t == null || h == null)         // saw uninitialized value
 627:                     continue;                       // spin
 628: 
 629:                 if (h == t || t.isData == isData) { // empty or same-mode
 630:                     QNode tn = t.next;
 631:                     if (t != tail)                  // inconsistent read
 632:                         continue;
 633:                     if (tn != null) {               // lagging tail
 634:                         advanceTail(t, tn);
 635:                         continue;
 636:                     }
 637:                     if (timed && nanos <= 0)        // can't wait
 638:                         return null;
 639:                     if (s == null)
 640:                         s = new QNode(e, isData);
 641:                     if (!t.casNext(null, s))        // failed to link in
 642:                         continue;
 643: 
 644:                     advanceTail(t, s);              // swing tail and wait
 645:                     Object x = awaitFulfill(s, e, timed, nanos);
 646:                     if (x == s) {                   // wait was cancelled
 647:                         clean(t, s);
 648:                         return null;
 649:                     }
 650: 
 651:                     if (!s.isOffList()) {           // not already unlinked
 652:                         advanceHead(t, s);          // unlink if head
 653:                         if (x != null)              // and forget fields
 654:                             s.item = s;
 655:                         s.waiter = null;
 656:                     }
 657:                     return (x != null)? x : e;
 658: 
 659:                 } else {                            // complementary-mode
 660:                     QNode m = h.next;               // node to fulfill
 661:                     if (t != tail || m == null || h != head)
 662:                         continue;                   // inconsistent read
 663: 
 664:                     Object x = m.item;
 665:                     if (isData == (x != null) ||    // m already fulfilled
 666:                         x == m ||                   // m cancelled
 667:                         !m.casItem(x, e)) {         // lost CAS
 668:                         advanceHead(h, m);          // dequeue and retry
 669:                         continue;
 670:                     }
 671: 
 672:                     advanceHead(h, m);              // successfully fulfilled
 673:                     LockSupport.unpark(m.waiter);
 674:                     return (x != null)? x : e;
 675:                 }
 676:             }
 677:         }
 678: 
 679:         /**
 680:          * Spins/blocks until node s is fulfilled.
 681:          *
 682:          * @param s the waiting node
 683:          * @param e the comparison value for checking match
 684:          * @param timed true if timed wait
 685:          * @param nanos timeout value
 686:          * @return matched item, or s if cancelled
 687:          */
 688:         Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
 689:             /* Same idea as TransferStack.awaitFulfill */
 690:             long lastTime = (timed)? System.nanoTime() : 0;
 691:             Thread w = Thread.currentThread();
 692:             int spins = ((head.next == s) ?
 693:                          (timed? maxTimedSpins : maxUntimedSpins) : 0);
 694:             for (;;) {
 695:                 if (w.isInterrupted())
 696:                     s.tryCancel(e);
 697:                 Object x = s.item;
 698:                 if (x != e)
 699:                     return x;
 700:                 if (timed) {
 701:                     long now = System.nanoTime();
 702:                     nanos -= now - lastTime;
 703:                     lastTime = now;
 704:                     if (nanos <= 0) {
 705:                         s.tryCancel(e);
 706:                         continue;
 707:                     }
 708:                 }
 709:                 if (spins > 0)
 710:                     --spins;
 711:                 else if (s.waiter == null)
 712:                     s.waiter = w;
 713:                 else if (!timed)
 714:                     LockSupport.park(this);
 715:                 else if (nanos > spinForTimeoutThreshold)
 716:                     LockSupport.parkNanos(this, nanos);
 717:             }
 718:         }
 719: 
 720:         /**
 721:          * Gets rid of cancelled node s with original predecessor pred.
 722:          */
 723:         void clean(QNode pred, QNode s) {
 724:             s.waiter = null; // forget thread
 725:             /*
 726:              * At any given time, exactly one node on list cannot be
 727:              * deleted -- the last inserted node. To accommodate this,
 728:              * if we cannot delete s, we save its predecessor as
 729:              * "cleanMe", deleting the previously saved version
 730:              * first. At least one of node s or the node previously
 731:              * saved can always be deleted, so this always terminates.
 732:              */
 733:             while (pred.next == s) { // Return early if already unlinked
 734:                 QNode h = head;
 735:                 QNode hn = h.next;   // Absorb cancelled first node as head
 736:                 if (hn != null && hn.isCancelled()) {
 737:                     advanceHead(h, hn);
 738:                     continue;
 739:                 }
 740:         QNode t = tail;      // Ensure consistent read for tail
 741:                 if (t == h)
 742:                     return;
 743:         QNode tn = t.next;
 744:         if (t != tail)
 745:                     continue;
 746:                 if (tn != null) {
 747:                     advanceTail(t, tn);
 748:                     continue;
 749:                 }
 750:                 if (s != t) {        // If not tail, try to unsplice
 751:                     QNode sn = s.next;
 752:                     if (sn == s || pred.casNext(s, sn))
 753:                         return;
 754:                 }
 755:                 QNode dp = cleanMe;
 756:                 if (dp != null) {    // Try unlinking previous cancelled node
 757:                     QNode d = dp.next;
 758:                     QNode dn;
 759:                     if (d == null ||               // d is gone or
 760:                         d == dp ||                 // d is off list or
 761:                         !d.isCancelled() ||        // d not cancelled or
 762:                         (d != t &&                 // d not tail and
 763:                          (dn = d.next) != null &&  //   has successor
 764:                          dn != d &&                //   that is on list
 765:                          dp.casNext(d, dn)))       // d unspliced
 766:                         casCleanMe(dp, null);
 767:                     if (dp == pred)
 768:                         return;      // s is already saved node
 769:                 } else if (casCleanMe(null, pred))
 770:                     return;          // Postpone cleaning s
 771:             }
 772:         }
 773:     }
 774: 
 775:     /**
 776:      * The transferer. Set only in constructor, but cannot be declared
 777:      * as final without further complicating serialization.  Since
 778:      * this is accessed only at most once per public method, there
 779:      * isn't a noticeable performance penalty for using volatile
 780:      * instead of final here.
 781:      */
 782:     private transient volatile Transferer transferer;
 783: 
 784:     /**
 785:      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
 786:      */
 787:     public SynchronousQueue() {
 788:         this(false);
 789:     }
 790: 
 791:     /**
 792:      * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
 793:      *
 794:      * @param fair if true, waiting threads contend in FIFO order for
 795:      *        access; otherwise the order is unspecified.
 796:      */
 797:     public SynchronousQueue(boolean fair) {
 798:         transferer = (fair)? new TransferQueue() : new TransferStack();
 799:     }
 800: 
 801:     /**
 802:      * Adds the specified element to this queue, waiting if necessary for
 803:      * another thread to receive it.
 804:      *
 805:      * @throws InterruptedException {@inheritDoc}
 806:      * @throws NullPointerException {@inheritDoc}
 807:      */
 808:     public void put(E o) throws InterruptedException {
 809:         if (o == null) throw new NullPointerException();
 810:         if (transferer.transfer(o, false, 0) == null) {
 811:         Thread.interrupted();
 812:             throw new InterruptedException();
 813:     }
 814:     }
 815: 
 816:     /**
 817:      * Inserts the specified element into this queue, waiting if necessary
 818:      * up to the specified wait time for another thread to receive it.
 819:      *
 820:      * @return <tt>true</tt> if successful, or <tt>false</tt> if the
 821:      *         specified waiting time elapses before a consumer appears.
 822:      * @throws InterruptedException {@inheritDoc}
 823:      * @throws NullPointerException {@inheritDoc}
 824:      */
 825:     public boolean offer(E o, long timeout, TimeUnit unit)
 826:         throws InterruptedException {
 827:         if (o == null) throw new NullPointerException();
 828:         if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
 829:             return true;
 830:         if (!Thread.interrupted())
 831:             return false;
 832:         throw new InterruptedException();
 833:     }
 834: 
 835:     /**
 836:      * Inserts the specified element into this queue, if another thread is
 837:      * waiting to receive it.
 838:      *
 839:      * @param e the element to add
 840:      * @return <tt>true</tt> if the element was added to this queue, else
 841:      *         <tt>false</tt>
 842:      * @throws NullPointerException if the specified element is null
 843:      */
 844:     public boolean offer(E e) {
 845:         if (e == null) throw new NullPointerException();
 846:         return transferer.transfer(e, true, 0) != null;
 847:     }
 848: 
 849:     /**
 850:      * Retrieves and removes the head of this queue, waiting if necessary
 851:      * for another thread to insert it.
 852:      *
 853:      * @return the head of this queue
 854:      * @throws InterruptedException {@inheritDoc}
 855:      */
 856:     public E take() throws InterruptedException {
 857:         Object e = transferer.transfer(null, false, 0);
 858:         if (e != null)
 859:             return (E)e;
 860:     Thread.interrupted();
 861:         throw new InterruptedException();
 862:     }
 863: 
 864:     /**
 865:      * Retrieves and removes the head of this queue, waiting
 866:      * if necessary up to the specified wait time, for another thread
 867:      * to insert it.
 868:      *
 869:      * @return the head of this queue, or <tt>null</tt> if the
 870:      *         specified waiting time elapses before an element is present.
 871:      * @throws InterruptedException {@inheritDoc}
 872:      */
 873:     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 874:         Object e = transferer.transfer(null, true, unit.toNanos(timeout));
 875:         if (e != null || !Thread.interrupted())
 876:             return (E)e;
 877:         throw new InterruptedException();
 878:     }
 879: 
 880:     /**
 881:      * Retrieves and removes the head of this queue, if another thread
 882:      * is currently making an element available.
 883:      *
 884:      * @return the head of this queue, or <tt>null</tt> if no
 885:      *         element is available.
 886:      */
 887:     public E poll() {
 888:         return (E)transferer.transfer(null, true, 0);
 889:     }
 890: 
 891:     /**
 892:      * Always returns <tt>true</tt>.
 893:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 894:      *
 895:      * @return <tt>true</tt>
 896:      */
 897:     public boolean isEmpty() {
 898:         return true;
 899:     }
 900: 
 901:     /**
 902:      * Always returns zero.
 903:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 904:      *
 905:      * @return zero.
 906:      */
 907:     public int size() {
 908:         return 0;
 909:     }
 910: 
 911:     /**
 912:      * Always returns zero.
 913:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 914:      *
 915:      * @return zero.
 916:      */
 917:     public int remainingCapacity() {
 918:         return 0;
 919:     }
 920: 
 921:     /**
 922:      * Does nothing.
 923:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 924:      */
 925:     public void clear() {
 926:     }
 927: 
 928:     /**
 929:      * Always returns <tt>false</tt>.
 930:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 931:      *
 932:      * @param o the element
 933:      * @return <tt>false</tt>
 934:      */
 935:     public boolean contains(Object o) {
 936:         return false;
 937:     }
 938: 
 939:     /**
 940:      * Always returns <tt>false</tt>.
 941:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 942:      *
 943:      * @param o the element to remove
 944:      * @return <tt>false</tt>
 945:      */
 946:     public boolean remove(Object o) {
 947:         return false;
 948:     }
 949: 
 950:     /**
 951:      * Returns <tt>false</tt> unless the given collection is empty.
 952:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 953:      *
 954:      * @param c the collection
 955:      * @return <tt>false</tt> unless given collection is empty
 956:      */
 957:     public boolean containsAll(Collection<?> c) {
 958:         return c.isEmpty();
 959:     }
 960: 
 961:     /**
 962:      * Always returns <tt>false</tt>.
 963:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 964:      *
 965:      * @param c the collection
 966:      * @return <tt>false</tt>
 967:      */
 968:     public boolean removeAll(Collection<?> c) {
 969:         return false;
 970:     }
 971: 
 972:     /**
 973:      * Always returns <tt>false</tt>.
 974:      * A <tt>SynchronousQueue</tt> has no internal capacity.
 975:      *
 976:      * @param c the collection
 977:      * @return <tt>false</tt>
 978:      */
 979:     public boolean retainAll(Collection<?> c) {
 980:         return false;
 981:     }
 982: 
 983:     /**
 984:      * Always returns <tt>null</tt>.
 985:      * A <tt>SynchronousQueue</tt> does not return elements
 986:      * unless actively waited on.
 987:      *
 988:      * @return <tt>null</tt>
 989:      */
 990:     public E peek() {
 991:         return null;
 992:     }
 993: 
 994:     static class EmptyIterator<E> implements Iterator<E> {
 995:         public boolean hasNext() {
 996:             return false;
 997:         }
 998:         public E next() {
 999:             throw new NoSuchElementException();
1000:         }
1001:         public void remove() {
1002:             throw new IllegalStateException();
1003:         }
1004:     }
1005: 
1006:     /**
1007:      * Returns an empty iterator in which <tt>hasNext</tt> always returns
1008:      * <tt>false</tt>.
1009:      *
1010:      * @return an empty iterator
1011:      */
1012:     public Iterator<E> iterator() {
1013:         return new EmptyIterator<E>();
1014:     }
1015: 
1016:     /**
1017:      * Returns a zero-length array.
1018:      * @return a zero-length array
1019:      */
1020:     public Object[] toArray() {
1021:         return new Object[0];
1022:     }
1023: 
1024:     /**
1025:      * Sets the zeroeth element of the specified array to <tt>null</tt>
1026:      * (if the array has non-zero length) and returns it.
1027:      *
1028:      * @param a the array
1029:      * @return the specified array
1030:      * @throws NullPointerException if the specified array is null
1031:      */
1032:     public <T> T[] toArray(T[] a) {
1033:         if (a.length > 0)
1034:             a[0] = null;
1035:         return a;
1036:     }
1037: 
1038:     /**
1039:      * @throws UnsupportedOperationException {@inheritDoc}
1040:      * @throws ClassCastException            {@inheritDoc}
1041:      * @throws NullPointerException          {@inheritDoc}
1042:      * @throws IllegalArgumentException      {@inheritDoc}
1043:      */
1044:     public int drainTo(Collection<? super E> c) {
1045:         if (c == null)
1046:             throw new NullPointerException();
1047:         if (c == this)
1048:             throw new IllegalArgumentException();
1049:         int n = 0;
1050:         E e;
1051:         while ( (e = poll()) != null) {
1052:             c.add(e);
1053:             ++n;
1054:         }
1055:         return n;
1056:     }
1057: 
1058:     /**
1059:      * @throws UnsupportedOperationException {@inheritDoc}
1060:      * @throws ClassCastException            {@inheritDoc}
1061:      * @throws NullPointerException          {@inheritDoc}
1062:      * @throws IllegalArgumentException      {@inheritDoc}
1063:      */
1064:     public int drainTo(Collection<? super E> c, int maxElements) {
1065:         if (c == null)
1066:             throw new NullPointerException();
1067:         if (c == this)
1068:             throw new IllegalArgumentException();
1069:         int n = 0;
1070:         E e;
1071:         while (n < maxElements && (e = poll()) != null) {
1072:             c.add(e);
1073:             ++n;
1074:         }
1075:         return n;
1076:     }
1077: 
1078:     /*
1079:      * To cope with serialization strategy in the 1.5 version of
1080:      * SynchronousQueue, we declare some unused classes and fields
1081:      * that exist solely to enable serializability across versions.
1082:      * These fields are never used, so are initialized only if this
1083:      * object is ever serialized or deserialized.
1084:      */
1085: 
1086:     static class WaitQueue implements java.io.Serializable { }
1087:     static class LifoWaitQueue extends WaitQueue {
1088:         private static final long serialVersionUID = -3633113410248163686L;
1089:     }
1090:     static class FifoWaitQueue extends WaitQueue {
1091:         private static final long serialVersionUID = -3623113410248163686L;
1092:     }
1093:     private ReentrantLock qlock;
1094:     private WaitQueue waitingProducers;
1095:     private WaitQueue waitingConsumers;
1096: 
1097:     /**
1098:      * Save the state to a stream (that is, serialize it).
1099:      *
1100:      * @param s the stream
1101:      */
1102:     private void writeObject(java.io.ObjectOutputStream s)
1103:         throws java.io.IOException {
1104:         boolean fair = transferer instanceof TransferQueue;
1105:         if (fair) {
1106:             qlock = new ReentrantLock(true);
1107:             waitingProducers = new FifoWaitQueue();
1108:             waitingConsumers = new FifoWaitQueue();
1109:         }
1110:         else {
1111:             qlock = new ReentrantLock();
1112:             waitingProducers = new LifoWaitQueue();
1113:             waitingConsumers = new LifoWaitQueue();
1114:         }
1115:         s.defaultWriteObject();
1116:     }
1117: 
1118:     private void readObject(final java.io.ObjectInputStream s)
1119:         throws java.io.IOException, ClassNotFoundException {
1120:         s.defaultReadObject();
1121:         if (waitingProducers instanceof FifoWaitQueue)
1122:             transferer = new TransferQueue();
1123:         else
1124:             transferer = new TransferStack();
1125:     }
1126: 
1127: }