Source for java.util.concurrent.ConcurrentLinkedQueue

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.*;
   9: import java.util.concurrent.atomic.*;
  10: 
  11: 
  12: /**
  13:  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
  14:  * This queue orders elements FIFO (first-in-first-out).
  15:  * The <em>head</em> of the queue is that element that has been on the
  16:  * queue the longest time.
  17:  * The <em>tail</em> of the queue is that element that has been on the
  18:  * queue the shortest time. New elements
  19:  * are inserted at the tail of the queue, and the queue retrieval
  20:  * operations obtain elements at the head of the queue.
  21:  * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
  22:  * many threads will share access to a common collection.
  23:  * This queue does not permit <tt>null</tt> elements.
  24:  *
  25:  * <p>This implementation employs an efficient &quot;wait-free&quot;
  26:  * algorithm based on one described in <a
  27:  * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
  28:  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
  29:  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
  30:  *
  31:  * <p>Beware that, unlike in most collections, the <tt>size</tt> method
  32:  * is <em>NOT</em> a constant-time operation. Because of the
  33:  * asynchronous nature of these queues, determining the current number
  34:  * of elements requires a traversal of the elements.
  35:  *
  36:  * <p>This class and its iterator implement all of the
  37:  * <em>optional</em> methods of the {@link Collection} and {@link
  38:  * Iterator} interfaces.
  39:  *
  40:  * <p>Memory consistency effects: As with other concurrent
  41:  * collections, actions in a thread prior to placing an object into a
  42:  * {@code ConcurrentLinkedQueue}
  43:  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  44:  * actions subsequent to the access or removal of that element from
  45:  * the {@code ConcurrentLinkedQueue} in another thread.
  46:  *
  47:  * <p>This class is a member of the
  48:  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  49:  * Java Collections Framework</a>.
  50:  *
  51:  * @since 1.5
  52:  * @author Doug Lea
  53:  * @param <E> the type of elements held in this collection
  54:  *
  55:  */
  56: public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  57:         implements Queue<E>, java.io.Serializable {
  58:     private static final long serialVersionUID = 196745693267521676L;
  59: 
  60:     /*
  61:      * This is a straight adaptation of Michael & Scott algorithm.
  62:      * For explanation, read the paper.  The only (minor) algorithmic
  63:      * difference is that this version supports lazy deletion of
  64:      * internal nodes (method remove(Object)) -- remove CAS'es item
  65:      * fields to null. The normal queue operations unlink but then
  66:      * pass over nodes with null item fields. Similarly, iteration
  67:      * methods ignore those with nulls.
  68:      *
  69:      * Also note that like most non-blocking algorithms in this
  70:      * package, this implementation relies on the fact that in garbage
  71:      * collected systems, there is no possibility of ABA problems due
  72:      * to recycled nodes, so there is no need to use "counted
  73:      * pointers" or related techniques seen in versions used in
  74:      * non-GC'ed settings.
  75:      */
  76: 
  77:     private static class Node<E> {
  78:         private volatile E item;
  79:         private volatile Node<E> next;
  80: 
  81:         private static final
  82:             AtomicReferenceFieldUpdater<Node, Node>
  83:             nextUpdater =
  84:             AtomicReferenceFieldUpdater.newUpdater
  85:             (Node.class, Node.class, "next");
  86:         private static final
  87:             AtomicReferenceFieldUpdater<Node, Object>
  88:             itemUpdater =
  89:             AtomicReferenceFieldUpdater.newUpdater
  90:             (Node.class, Object.class, "item");
  91: 
  92:         Node(E x) { item = x; }
  93: 
  94:         Node(E x, Node<E> n) { item = x; next = n; }
  95: 
  96:         E getItem() {
  97:             return item;
  98:         }
  99: 
 100:         boolean casItem(E cmp, E val) {
 101:             return itemUpdater.compareAndSet(this, cmp, val);
 102:         }
 103: 
 104:         void setItem(E val) {
 105:             itemUpdater.set(this, val);
 106:         }
 107: 
 108:         Node<E> getNext() {
 109:             return next;
 110:         }
 111: 
 112:         boolean casNext(Node<E> cmp, Node<E> val) {
 113:             return nextUpdater.compareAndSet(this, cmp, val);
 114:         }
 115: 
 116:         void setNext(Node<E> val) {
 117:             nextUpdater.set(this, val);
 118:         }
 119: 
 120:     }
 121: 
 122:     private static final
 123:         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
 124:         tailUpdater =
 125:         AtomicReferenceFieldUpdater.newUpdater
 126:         (ConcurrentLinkedQueue.class, Node.class, "tail");
 127:     private static final
 128:         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
 129:         headUpdater =
 130:         AtomicReferenceFieldUpdater.newUpdater
 131:         (ConcurrentLinkedQueue.class,  Node.class, "head");
 132: 
 133:     private boolean casTail(Node<E> cmp, Node<E> val) {
 134:         return tailUpdater.compareAndSet(this, cmp, val);
 135:     }
 136: 
 137:     private boolean casHead(Node<E> cmp, Node<E> val) {
 138:         return headUpdater.compareAndSet(this, cmp, val);
 139:     }
 140: 
 141: 
 142:     /**
 143:      * Pointer to header node, initialized to a dummy node.  The first
 144:      * actual node is at head.getNext().
 145:      */
 146:     private transient volatile Node<E> head = new Node<E>(null, null);
 147: 
 148:     /** Pointer to last node on list **/
 149:     private transient volatile Node<E> tail = head;
 150: 
 151: 
 152:     /**
 153:      * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
 154:      */
 155:     public ConcurrentLinkedQueue() {}
 156: 
 157:     /**
 158:      * Creates a <tt>ConcurrentLinkedQueue</tt>
 159:      * initially containing the elements of the given collection,
 160:      * added in traversal order of the collection's iterator.
 161:      * @param c the collection of elements to initially contain
 162:      * @throws NullPointerException if the specified collection or any
 163:      *         of its elements are null
 164:      */
 165:     public ConcurrentLinkedQueue(Collection<? extends E> c) {
 166:         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
 167:             add(it.next());
 168:     }
 169: 
 170:     // Have to override just to update the javadoc
 171: 
 172:     /**
 173:      * Inserts the specified element at the tail of this queue.
 174:      *
 175:      * @return <tt>true</tt> (as specified by {@link Collection#add})
 176:      * @throws NullPointerException if the specified element is null
 177:      */
 178:     public boolean add(E e) {
 179:         return offer(e);
 180:     }
 181: 
 182:     /**
 183:      * Inserts the specified element at the tail of this queue.
 184:      *
 185:      * @return <tt>true</tt> (as specified by {@link Queue#offer})
 186:      * @throws NullPointerException if the specified element is null
 187:      */
 188:     public boolean offer(E e) {
 189:         if (e == null) throw new NullPointerException();
 190:         Node<E> n = new Node<E>(e, null);
 191:         for (;;) {
 192:             Node<E> t = tail;
 193:             Node<E> s = t.getNext();
 194:             if (t == tail) {
 195:                 if (s == null) {
 196:                     if (t.casNext(s, n)) {
 197:                         casTail(t, n);
 198:                         return true;
 199:                     }
 200:                 } else {
 201:                     casTail(t, s);
 202:                 }
 203:             }
 204:         }
 205:     }
 206: 
 207:     public E poll() {
 208:         for (;;) {
 209:             Node<E> h = head;
 210:             Node<E> t = tail;
 211:             Node<E> first = h.getNext();
 212:             if (h == head) {
 213:                 if (h == t) {
 214:                     if (first == null)
 215:                         return null;
 216:                     else
 217:                         casTail(t, first);
 218:                 } else if (casHead(h, first)) {
 219:                     E item = first.getItem();
 220:                     if (item != null) {
 221:                         first.setItem(null);
 222:                         return item;
 223:                     }
 224:                     // else skip over deleted item, continue loop,
 225:                 }
 226:             }
 227:         }
 228:     }
 229: 
 230:     public E peek() { // same as poll except don't remove item
 231:         for (;;) {
 232:             Node<E> h = head;
 233:             Node<E> t = tail;
 234:             Node<E> first = h.getNext();
 235:             if (h == head) {
 236:                 if (h == t) {
 237:                     if (first == null)
 238:                         return null;
 239:                     else
 240:                         casTail(t, first);
 241:                 } else {
 242:                     E item = first.getItem();
 243:                     if (item != null)
 244:                         return item;
 245:                     else // remove deleted node and continue
 246:                         casHead(h, first);
 247:                 }
 248:             }
 249:         }
 250:     }
 251: 
 252:     /**
 253:      * Returns the first actual (non-header) node on list.  This is yet
 254:      * another variant of poll/peek; here returning out the first
 255:      * node, not element (so we cannot collapse with peek() without
 256:      * introducing race.)
 257:      */
 258:     Node<E> first() {
 259:         for (;;) {
 260:             Node<E> h = head;
 261:             Node<E> t = tail;
 262:             Node<E> first = h.getNext();
 263:             if (h == head) {
 264:                 if (h == t) {
 265:                     if (first == null)
 266:                         return null;
 267:                     else
 268:                         casTail(t, first);
 269:                 } else {
 270:                     if (first.getItem() != null)
 271:                         return first;
 272:                     else // remove deleted node and continue
 273:                         casHead(h, first);
 274:                 }
 275:             }
 276:         }
 277:     }
 278: 
 279: 
 280:     /**
 281:      * Returns <tt>true</tt> if this queue contains no elements.
 282:      *
 283:      * @return <tt>true</tt> if this queue contains no elements
 284:      */
 285:     public boolean isEmpty() {
 286:         return first() == null;
 287:     }
 288: 
 289:     /**
 290:      * Returns the number of elements in this queue.  If this queue
 291:      * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
 292:      * <tt>Integer.MAX_VALUE</tt>.
 293:      *
 294:      * <p>Beware that, unlike in most collections, this method is
 295:      * <em>NOT</em> a constant-time operation. Because of the
 296:      * asynchronous nature of these queues, determining the current
 297:      * number of elements requires an O(n) traversal.
 298:      *
 299:      * @return the number of elements in this queue
 300:      */
 301:     public int size() {
 302:         int count = 0;
 303:         for (Node<E> p = first(); p != null; p = p.getNext()) {
 304:             if (p.getItem() != null) {
 305:                 // Collections.size() spec says to max out
 306:                 if (++count == Integer.MAX_VALUE)
 307:                     break;
 308:             }
 309:         }
 310:         return count;
 311:     }
 312: 
 313:     /**
 314:      * Returns <tt>true</tt> if this queue contains the specified element.
 315:      * More formally, returns <tt>true</tt> if and only if this queue contains
 316:      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
 317:      *
 318:      * @param o object to be checked for containment in this queue
 319:      * @return <tt>true</tt> if this queue contains the specified element
 320:      */
 321:     public boolean contains(Object o) {
 322:         if (o == null) return false;
 323:         for (Node<E> p = first(); p != null; p = p.getNext()) {
 324:             E item = p.getItem();
 325:             if (item != null &&
 326:                 o.equals(item))
 327:                 return true;
 328:         }
 329:         return false;
 330:     }
 331: 
 332:     /**
 333:      * Removes a single instance of the specified element from this queue,
 334:      * if it is present.  More formally, removes an element <tt>e</tt> such
 335:      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
 336:      * elements.
 337:      * Returns <tt>true</tt> if this queue contained the specified element
 338:      * (or equivalently, if this queue changed as a result of the call).
 339:      *
 340:      * @param o element to be removed from this queue, if present
 341:      * @return <tt>true</tt> if this queue changed as a result of the call
 342:      */
 343:     public boolean remove(Object o) {
 344:         if (o == null) return false;
 345:         for (Node<E> p = first(); p != null; p = p.getNext()) {
 346:             E item = p.getItem();
 347:             if (item != null &&
 348:                 o.equals(item) &&
 349:                 p.casItem(item, null))
 350:                 return true;
 351:         }
 352:         return false;
 353:     }
 354: 
 355:     /**
 356:      * Returns an iterator over the elements in this queue in proper sequence.
 357:      * The returned iterator is a "weakly consistent" iterator that
 358:      * will never throw {@link ConcurrentModificationException},
 359:      * and guarantees to traverse elements as they existed upon
 360:      * construction of the iterator, and may (but is not guaranteed to)
 361:      * reflect any modifications subsequent to construction.
 362:      *
 363:      * @return an iterator over the elements in this queue in proper sequence
 364:      */
 365:     public Iterator<E> iterator() {
 366:         return new Itr();
 367:     }
 368: 
 369:     private class Itr implements Iterator<E> {
 370:         /**
 371:          * Next node to return item for.
 372:          */
 373:         private Node<E> nextNode;
 374: 
 375:         /**
 376:          * nextItem holds on to item fields because once we claim
 377:          * that an element exists in hasNext(), we must return it in
 378:          * the following next() call even if it was in the process of
 379:          * being removed when hasNext() was called.
 380:          */
 381:         private E nextItem;
 382: 
 383:         /**
 384:          * Node of the last returned item, to support remove.
 385:          */
 386:         private Node<E> lastRet;
 387: 
 388:         Itr() {
 389:             advance();
 390:         }
 391: 
 392:         /**
 393:          * Moves to next valid node and returns item to return for
 394:          * next(), or null if no such.
 395:          */
 396:         private E advance() {
 397:             lastRet = nextNode;
 398:             E x = nextItem;
 399: 
 400:             Node<E> p = (nextNode == null)? first() : nextNode.getNext();
 401:             for (;;) {
 402:                 if (p == null) {
 403:                     nextNode = null;
 404:                     nextItem = null;
 405:                     return x;
 406:                 }
 407:                 E item = p.getItem();
 408:                 if (item != null) {
 409:                     nextNode = p;
 410:                     nextItem = item;
 411:                     return x;
 412:                 } else // skip over nulls
 413:                     p = p.getNext();
 414:             }
 415:         }
 416: 
 417:         public boolean hasNext() {
 418:             return nextNode != null;
 419:         }
 420: 
 421:         public E next() {
 422:             if (nextNode == null) throw new NoSuchElementException();
 423:             return advance();
 424:         }
 425: 
 426:         public void remove() {
 427:             Node<E> l = lastRet;
 428:             if (l == null) throw new IllegalStateException();
 429:             // rely on a future traversal to relink.
 430:             l.setItem(null);
 431:             lastRet = null;
 432:         }
 433:     }
 434: 
 435:     /**
 436:      * Save the state to a stream (that is, serialize it).
 437:      *
 438:      * @serialData All of the elements (each an <tt>E</tt>) in
 439:      * the proper order, followed by a null
 440:      * @param s the stream
 441:      */
 442:     private void writeObject(java.io.ObjectOutputStream s)
 443:         throws java.io.IOException {
 444: 
 445:         // Write out any hidden stuff
 446:         s.defaultWriteObject();
 447: 
 448:         // Write out all elements in the proper order.
 449:         for (Node<E> p = first(); p != null; p = p.getNext()) {
 450:             Object item = p.getItem();
 451:             if (item != null)
 452:                 s.writeObject(item);
 453:         }
 454: 
 455:         // Use trailing null as sentinel
 456:         s.writeObject(null);
 457:     }
 458: 
 459:     /**
 460:      * Reconstitute the Queue instance from a stream (that is,
 461:      * deserialize it).
 462:      * @param s the stream
 463:      */
 464:     private void readObject(java.io.ObjectInputStream s)
 465:         throws java.io.IOException, ClassNotFoundException {
 466:         // Read in capacity, and any hidden stuff
 467:         s.defaultReadObject();
 468:         head = new Node<E>(null, null);
 469:         tail = head;
 470:         // Read in all elements and place in queue
 471:         for (;;) {
 472:             E item = (E)s.readObject();
 473:             if (item == null)
 474:                 break;
 475:             else
 476:                 offer(item);
 477:         }
 478:     }
 479: 
 480: }