Source for java.util.concurrent.PriorityBlockingQueue

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: 
   9: import java.util.concurrent.locks.*;
  10: import java.util.*;
  11: 
  12: /**
  13:  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  14:  * the same ordering rules as class {@link PriorityQueue} and supplies
  15:  * blocking retrieval operations.  While this queue is logically
  16:  * unbounded, attempted additions may fail due to resource exhaustion
  17:  * (causing <tt>OutOfMemoryError</tt>). This class does not permit
  18:  * <tt>null</tt> elements.  A priority queue relying on {@linkplain
  19:  * Comparable natural ordering} also does not permit insertion of
  20:  * non-comparable objects (doing so results in
  21:  * <tt>ClassCastException</tt>).
  22:  *
  23:  * <p>This class and its iterator implement all of the
  24:  * <em>optional</em> methods of the {@link Collection} and {@link
  25:  * Iterator} interfaces.  The Iterator provided in method {@link
  26:  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  27:  * the PriorityBlockingQueue in any particular order. If you need
  28:  * ordered traversal, consider using
  29:  * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
  30:  * can be used to <em>remove</em> some or all elements in priority
  31:  * order and place them in another collection.
  32:  *
  33:  * <p>Operations on this class make no guarantees about the ordering
  34:  * of elements with equal priority. If you need to enforce an
  35:  * ordering, you can define custom classes or comparators that use a
  36:  * secondary key to break ties in primary priority values.  For
  37:  * example, here is a class that applies first-in-first-out
  38:  * tie-breaking to comparable elements. To use it, you would insert a
  39:  * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
  40:  *
  41:  * <pre>
  42:  * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
  43:  *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
  44:  *   final static AtomicLong seq = new AtomicLong();
  45:  *   final long seqNum;
  46:  *   final E entry;
  47:  *   public FIFOEntry(E entry) {
  48:  *     seqNum = seq.getAndIncrement();
  49:  *     this.entry = entry;
  50:  *   }
  51:  *   public E getEntry() { return entry; }
  52:  *   public int compareTo(FIFOEntry&lt;E&gt; other) {
  53:  *     int res = entry.compareTo(other.entry);
  54:  *     if (res == 0 &amp;&amp; other.entry != this.entry)
  55:  *       res = (seqNum &lt; other.seqNum ? -1 : 1);
  56:  *     return res;
  57:  *   }
  58:  * }</pre>
  59:  *
  60:  * <p>This class is a member of the
  61:  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  62:  * Java Collections Framework</a>.
  63:  *
  64:  * @since 1.5
  65:  * @author Doug Lea
  66:  * @param <E> the type of elements held in this collection
  67:  */
  68: public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  69:     implements BlockingQueue<E>, java.io.Serializable {
  70:     private static final long serialVersionUID = 5595510919245408276L;
  71: 
  72:     private final PriorityQueue<E> q;
  73:     private final ReentrantLock lock = new ReentrantLock(true);
  74:     private final Condition notEmpty = lock.newCondition();
  75: 
  76:     /**
  77:      * Creates a <tt>PriorityBlockingQueue</tt> with the default
  78:      * initial capacity (11) that orders its elements according to
  79:      * their {@linkplain Comparable natural ordering}.
  80:      */
  81:     public PriorityBlockingQueue() {
  82:         q = new PriorityQueue<E>();
  83:     }
  84: 
  85:     /**
  86:      * Creates a <tt>PriorityBlockingQueue</tt> with the specified
  87:      * initial capacity that orders its elements according to their
  88:      * {@linkplain Comparable natural ordering}.
  89:      *
  90:      * @param initialCapacity the initial capacity for this priority queue
  91:      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
  92:      *         than 1
  93:      */
  94:     public PriorityBlockingQueue(int initialCapacity) {
  95:         q = new PriorityQueue<E>(initialCapacity, null);
  96:     }
  97: 
  98:     /**
  99:      * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
 100:      * capacity that orders its elements according to the specified
 101:      * comparator.
 102:      *
 103:      * @param initialCapacity the initial capacity for this priority queue
 104:      * @param  comparator the comparator that will be used to order this
 105:      *         priority queue.  If {@code null}, the {@linkplain Comparable
 106:      *         natural ordering} of the elements will be used.
 107:      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
 108:      *         than 1
 109:      */
 110:     public PriorityBlockingQueue(int initialCapacity,
 111:                                  Comparator<? super E> comparator) {
 112:         q = new PriorityQueue<E>(initialCapacity, comparator);
 113:     }
 114: 
 115:     /**
 116:      * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
 117:      * in the specified collection.  If the specified collection is a
 118:      * {@link SortedSet} or a {@link PriorityQueue},  this
 119:      * priority queue will be ordered according to the same ordering.
 120:      * Otherwise, this priority queue will be ordered according to the
 121:      * {@linkplain Comparable natural ordering} of its elements.
 122:      *
 123:      * @param  c the collection whose elements are to be placed
 124:      *         into this priority queue
 125:      * @throws ClassCastException if elements of the specified collection
 126:      *         cannot be compared to one another according to the priority
 127:      *         queue's ordering
 128:      * @throws NullPointerException if the specified collection or any
 129:      *         of its elements are null
 130:      */
 131:     public PriorityBlockingQueue(Collection<? extends E> c) {
 132:         q = new PriorityQueue<E>(c);
 133:     }
 134: 
 135:     /**
 136:      * Inserts the specified element into this priority queue.
 137:      *
 138:      * @param e the element to add
 139:      * @return <tt>true</tt> (as specified by {@link Collection#add})
 140:      * @throws ClassCastException if the specified element cannot be compared
 141:      *         with elements currently in the priority queue according to the
 142:      *         priority queue's ordering
 143:      * @throws NullPointerException if the specified element is null
 144:      */
 145:     public boolean add(E e) {
 146:         return offer(e);
 147:     }
 148: 
 149:     /**
 150:      * Inserts the specified element into this priority queue.
 151:      *
 152:      * @param e the element to add
 153:      * @return <tt>true</tt> (as specified by {@link Queue#offer})
 154:      * @throws ClassCastException if the specified element cannot be compared
 155:      *         with elements currently in the priority queue according to the
 156:      *         priority queue's ordering
 157:      * @throws NullPointerException if the specified element is null
 158:      */
 159:     public boolean offer(E e) {
 160:         final ReentrantLock lock = this.lock;
 161:         lock.lock();
 162:         try {
 163:             boolean ok = q.offer(e);
 164:             assert ok;
 165:             notEmpty.signal();
 166:             return true;
 167:         } finally {
 168:             lock.unlock();
 169:         }
 170:     }
 171: 
 172:     /**
 173:      * Inserts the specified element into this priority queue. As the queue is
 174:      * unbounded this method will never block.
 175:      *
 176:      * @param e the element to add
 177:      * @throws ClassCastException if the specified element cannot be compared
 178:      *         with elements currently in the priority queue according to the
 179:      *         priority queue's ordering
 180:      * @throws NullPointerException if the specified element is null
 181:      */
 182:     public void put(E e) {
 183:         offer(e); // never need to block
 184:     }
 185: 
 186:     /**
 187:      * Inserts the specified element into this priority queue. As the queue is
 188:      * unbounded this method will never block.
 189:      *
 190:      * @param e the element to add
 191:      * @param timeout This parameter is ignored as the method never blocks
 192:      * @param unit This parameter is ignored as the method never blocks
 193:      * @return <tt>true</tt>
 194:      * @throws ClassCastException if the specified element cannot be compared
 195:      *         with elements currently in the priority queue according to the
 196:      *         priority queue's ordering
 197:      * @throws NullPointerException if the specified element is null
 198:      */
 199:     public boolean offer(E e, long timeout, TimeUnit unit) {
 200:         return offer(e); // never need to block
 201:     }
 202: 
 203:     public E poll() {
 204:         final ReentrantLock lock = this.lock;
 205:         lock.lock();
 206:         try {
 207:             return q.poll();
 208:         } finally {
 209:             lock.unlock();
 210:         }
 211:     }
 212: 
 213:     public E take() throws InterruptedException {
 214:         final ReentrantLock lock = this.lock;
 215:         lock.lockInterruptibly();
 216:         try {
 217:             try {
 218:                 while (q.size() == 0)
 219:                     notEmpty.await();
 220:             } catch (InterruptedException ie) {
 221:                 notEmpty.signal(); // propagate to non-interrupted thread
 222:                 throw ie;
 223:             }
 224:             E x = q.poll();
 225:             assert x != null;
 226:             return x;
 227:         } finally {
 228:             lock.unlock();
 229:         }
 230:     }
 231: 
 232:     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 233:         long nanos = unit.toNanos(timeout);
 234:         final ReentrantLock lock = this.lock;
 235:         lock.lockInterruptibly();
 236:         try {
 237:             for (;;) {
 238:                 E x = q.poll();
 239:                 if (x != null)
 240:                     return x;
 241:                 if (nanos <= 0)
 242:                     return null;
 243:                 try {
 244:                     nanos = notEmpty.awaitNanos(nanos);
 245:                 } catch (InterruptedException ie) {
 246:                     notEmpty.signal(); // propagate to non-interrupted thread
 247:                     throw ie;
 248:                 }
 249:             }
 250:         } finally {
 251:             lock.unlock();
 252:         }
 253:     }
 254: 
 255:     public E peek() {
 256:         final ReentrantLock lock = this.lock;
 257:         lock.lock();
 258:         try {
 259:             return q.peek();
 260:         } finally {
 261:             lock.unlock();
 262:         }
 263:     }
 264: 
 265:     /**
 266:      * Returns the comparator used to order the elements in this queue,
 267:      * or <tt>null</tt> if this queue uses the {@linkplain Comparable
 268:      * natural ordering} of its elements.
 269:      *
 270:      * @return the comparator used to order the elements in this queue,
 271:      *         or <tt>null</tt> if this queue uses the natural
 272:      *         ordering of its elements
 273:      */
 274:     public Comparator<? super E> comparator() {
 275:         return q.comparator();
 276:     }
 277: 
 278:     public int size() {
 279:         final ReentrantLock lock = this.lock;
 280:         lock.lock();
 281:         try {
 282:             return q.size();
 283:         } finally {
 284:             lock.unlock();
 285:         }
 286:     }
 287: 
 288:     /**
 289:      * Always returns <tt>Integer.MAX_VALUE</tt> because
 290:      * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
 291:      * @return <tt>Integer.MAX_VALUE</tt>
 292:      */
 293:     public int remainingCapacity() {
 294:         return Integer.MAX_VALUE;
 295:     }
 296: 
 297:     /**
 298:      * Removes a single instance of the specified element from this queue,
 299:      * if it is present.  More formally, removes an element {@code e} such
 300:      * that {@code o.equals(e)}, if this queue contains one or more such
 301:      * elements.  Returns {@code true} if and only if this queue contained
 302:      * the specified element (or equivalently, if this queue changed as a
 303:      * result of the call).
 304:      *
 305:      * @param o element to be removed from this queue, if present
 306:      * @return <tt>true</tt> if this queue changed as a result of the call
 307:      */
 308:     public boolean remove(Object o) {
 309:         final ReentrantLock lock = this.lock;
 310:         lock.lock();
 311:         try {
 312:             return q.remove(o);
 313:         } finally {
 314:             lock.unlock();
 315:         }
 316:     }
 317: 
 318:     /**
 319:      * Returns {@code true} if this queue contains the specified element.
 320:      * More formally, returns {@code true} if and only if this queue contains
 321:      * at least one element {@code e} such that {@code o.equals(e)}.
 322:      *
 323:      * @param o object to be checked for containment in this queue
 324:      * @return <tt>true</tt> if this queue contains the specified element
 325:      */
 326:     public boolean contains(Object o) {
 327:         final ReentrantLock lock = this.lock;
 328:         lock.lock();
 329:         try {
 330:             return q.contains(o);
 331:         } finally {
 332:             lock.unlock();
 333:         }
 334:     }
 335: 
 336:     /**
 337:      * Returns an array containing all of the elements in this queue.
 338:      * The returned array elements are in no particular order.
 339:      *
 340:      * <p>The returned array will be "safe" in that no references to it are
 341:      * maintained by this queue.  (In other words, this method must allocate
 342:      * a new array).  The caller is thus free to modify the returned array.
 343:      *
 344:      * <p>This method acts as bridge between array-based and collection-based
 345:      * APIs.
 346:      *
 347:      * @return an array containing all of the elements in this queue
 348:      */
 349:     public Object[] toArray() {
 350:         final ReentrantLock lock = this.lock;
 351:         lock.lock();
 352:         try {
 353:             return q.toArray();
 354:         } finally {
 355:             lock.unlock();
 356:         }
 357:     }
 358: 
 359: 
 360:     public String toString() {
 361:         final ReentrantLock lock = this.lock;
 362:         lock.lock();
 363:         try {
 364:             return q.toString();
 365:         } finally {
 366:             lock.unlock();
 367:         }
 368:     }
 369: 
 370:     /**
 371:      * @throws UnsupportedOperationException {@inheritDoc}
 372:      * @throws ClassCastException            {@inheritDoc}
 373:      * @throws NullPointerException          {@inheritDoc}
 374:      * @throws IllegalArgumentException      {@inheritDoc}
 375:      */
 376:     public int drainTo(Collection<? super E> c) {
 377:         if (c == null)
 378:             throw new NullPointerException();
 379:         if (c == this)
 380:             throw new IllegalArgumentException();
 381:         final ReentrantLock lock = this.lock;
 382:         lock.lock();
 383:         try {
 384:             int n = 0;
 385:             E e;
 386:             while ( (e = q.poll()) != null) {
 387:                 c.add(e);
 388:                 ++n;
 389:             }
 390:             return n;
 391:         } finally {
 392:             lock.unlock();
 393:         }
 394:     }
 395: 
 396:     /**
 397:      * @throws UnsupportedOperationException {@inheritDoc}
 398:      * @throws ClassCastException            {@inheritDoc}
 399:      * @throws NullPointerException          {@inheritDoc}
 400:      * @throws IllegalArgumentException      {@inheritDoc}
 401:      */
 402:     public int drainTo(Collection<? super E> c, int maxElements) {
 403:         if (c == null)
 404:             throw new NullPointerException();
 405:         if (c == this)
 406:             throw new IllegalArgumentException();
 407:         if (maxElements <= 0)
 408:             return 0;
 409:         final ReentrantLock lock = this.lock;
 410:         lock.lock();
 411:         try {
 412:             int n = 0;
 413:             E e;
 414:             while (n < maxElements && (e = q.poll()) != null) {
 415:                 c.add(e);
 416:                 ++n;
 417:             }
 418:             return n;
 419:         } finally {
 420:             lock.unlock();
 421:         }
 422:     }
 423: 
 424:     /**
 425:      * Atomically removes all of the elements from this queue.
 426:      * The queue will be empty after this call returns.
 427:      */
 428:     public void clear() {
 429:         final ReentrantLock lock = this.lock;
 430:         lock.lock();
 431:         try {
 432:             q.clear();
 433:         } finally {
 434:             lock.unlock();
 435:         }
 436:     }
 437: 
 438:     /**
 439:      * Returns an array containing all of the elements in this queue; the
 440:      * runtime type of the returned array is that of the specified array.
 441:      * The returned array elements are in no particular order.
 442:      * If the queue fits in the specified array, it is returned therein.
 443:      * Otherwise, a new array is allocated with the runtime type of the
 444:      * specified array and the size of this queue.
 445:      *
 446:      * <p>If this queue fits in the specified array with room to spare
 447:      * (i.e., the array has more elements than this queue), the element in
 448:      * the array immediately following the end of the queue is set to
 449:      * <tt>null</tt>.
 450:      *
 451:      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 452:      * array-based and collection-based APIs.  Further, this method allows
 453:      * precise control over the runtime type of the output array, and may,
 454:      * under certain circumstances, be used to save allocation costs.
 455:      *
 456:      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
 457:      * The following code can be used to dump the queue into a newly
 458:      * allocated array of <tt>String</tt>:
 459:      *
 460:      * <pre>
 461:      *     String[] y = x.toArray(new String[0]);</pre>
 462:      *
 463:      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 464:      * <tt>toArray()</tt>.
 465:      *
 466:      * @param a the array into which the elements of the queue are to
 467:      *          be stored, if it is big enough; otherwise, a new array of the
 468:      *          same runtime type is allocated for this purpose
 469:      * @return an array containing all of the elements in this queue
 470:      * @throws ArrayStoreException if the runtime type of the specified array
 471:      *         is not a supertype of the runtime type of every element in
 472:      *         this queue
 473:      * @throws NullPointerException if the specified array is null
 474:      */
 475:     public <T> T[] toArray(T[] a) {
 476:         final ReentrantLock lock = this.lock;
 477:         lock.lock();
 478:         try {
 479:             return q.toArray(a);
 480:         } finally {
 481:             lock.unlock();
 482:         }
 483:     }
 484: 
 485:     /**
 486:      * Returns an iterator over the elements in this queue. The
 487:      * iterator does not return the elements in any particular order.
 488:      * The returned <tt>Iterator</tt> is a "weakly consistent"
 489:      * iterator that will never throw {@link
 490:      * ConcurrentModificationException}, and guarantees to traverse
 491:      * elements as they existed upon construction of the iterator, and
 492:      * may (but is not guaranteed to) reflect any modifications
 493:      * subsequent to construction.
 494:      *
 495:      * @return an iterator over the elements in this queue
 496:      */
 497:     public Iterator<E> iterator() {
 498:         return new Itr(toArray());
 499:     }
 500: 
 501:     /**
 502:      * Snapshot iterator that works off copy of underlying q array.
 503:      */
 504:     private class Itr implements Iterator<E> {
 505:         final Object[] array; // Array of all elements
 506:     int cursor;           // index of next element to return;
 507:     int lastRet;          // index of last element, or -1 if no such
 508: 
 509:         Itr(Object[] array) {
 510:             lastRet = -1;
 511:             this.array = array;
 512:         }
 513: 
 514:         public boolean hasNext() {
 515:             return cursor < array.length;
 516:         }
 517: 
 518:         public E next() {
 519:             if (cursor >= array.length)
 520:                 throw new NoSuchElementException();
 521:             lastRet = cursor;
 522:             return (E)array[cursor++];
 523:         }
 524: 
 525:         public void remove() {
 526:             if (lastRet < 0)
 527:         throw new IllegalStateException();
 528:             Object x = array[lastRet];
 529:             lastRet = -1;
 530:             // Traverse underlying queue to find == element,
 531:             // not just a .equals element.
 532:             lock.lock();
 533:             try {
 534:                 for (Iterator it = q.iterator(); it.hasNext(); ) {
 535:                     if (it.next() == x) {
 536:                         it.remove();
 537:                         return;
 538:                     }
 539:                 }
 540:             } finally {
 541:                 lock.unlock();
 542:             }
 543:         }
 544:     }
 545: 
 546:     /**
 547:      * Saves the state to a stream (that is, serializes it).  This
 548:      * merely wraps default serialization within lock.  The
 549:      * serialization strategy for items is left to underlying
 550:      * Queue. Note that locking is not needed on deserialization, so
 551:      * readObject is not defined, just relying on default.
 552:      */
 553:     private void writeObject(java.io.ObjectOutputStream s)
 554:         throws java.io.IOException {
 555:         lock.lock();
 556:         try {
 557:             s.defaultWriteObject();
 558:         } finally {
 559:             lock.unlock();
 560:         }
 561:     }
 562: 
 563: }