Source for java.util.concurrent.LinkedBlockingQueue

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.concurrent.atomic.*;
   9: import java.util.concurrent.locks.*;
  10: import java.util.*;
  11: 
  12: /**
  13:  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
  14:  * linked nodes.
  15:  * This queue orders elements FIFO (first-in-first-out).
  16:  * The <em>head</em> of the queue is that element that has been on the
  17:  * queue the longest time.
  18:  * The <em>tail</em> of the queue is that element that has been on the
  19:  * queue the shortest time. New elements
  20:  * are inserted at the tail of the queue, and the queue retrieval
  21:  * operations obtain elements at the head of the queue.
  22:  * Linked queues typically have higher throughput than array-based queues but
  23:  * less predictable performance in most concurrent applications.
  24:  *
  25:  * <p> The optional capacity bound constructor argument serves as a
  26:  * way to prevent excessive queue expansion. The capacity, if unspecified,
  27:  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  28:  * dynamically created upon each insertion unless this would bring the
  29:  * queue above capacity.
  30:  *
  31:  * <p>This class and its iterator implement all of the
  32:  * <em>optional</em> methods of the {@link Collection} and {@link
  33:  * Iterator} interfaces.
  34:  *
  35:  * <p>This class is a member of the
  36:  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  37:  * Java Collections Framework</a>.
  38:  *
  39:  * @since 1.5
  40:  * @author Doug Lea
  41:  * @param <E> the type of elements held in this collection
  42:  *
  43:  */
  44: public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  45:         implements BlockingQueue<E>, java.io.Serializable {
  46:     private static final long serialVersionUID = -6903933977591709194L;
  47: 
  48:     /*
  49:      * A variant of the "two lock queue" algorithm.  The putLock gates
  50:      * entry to put (and offer), and has an associated condition for
  51:      * waiting puts.  Similarly for the takeLock.  The "count" field
  52:      * that they both rely on is maintained as an atomic to avoid
  53:      * needing to get both locks in most cases. Also, to minimize need
  54:      * for puts to get takeLock and vice-versa, cascading notifies are
  55:      * used. When a put notices that it has enabled at least one take,
  56:      * it signals taker. That taker in turn signals others if more
  57:      * items have been entered since the signal. And symmetrically for
  58:      * takes signalling puts. Operations such as remove(Object) and
  59:      * iterators acquire both locks.
  60:      */
  61: 
  62:     /**
  63:      * Linked list node class
  64:      */
  65:     static class Node<E> {
  66:         /** The item, volatile to ensure barrier separating write and read */
  67:         volatile E item;
  68:         Node<E> next;
  69:         Node(E x) { item = x; }
  70:     }
  71: 
  72:     /** The capacity bound, or Integer.MAX_VALUE if none */
  73:     private final int capacity;
  74: 
  75:     /** Current number of elements */
  76:     private final AtomicInteger count = new AtomicInteger(0);
  77: 
  78:     /** Head of linked list */
  79:     private transient Node<E> head;
  80: 
  81:     /** Tail of linked list */
  82:     private transient Node<E> last;
  83: 
  84:     /** Lock held by take, poll, etc */
  85:     private final ReentrantLock takeLock = new ReentrantLock();
  86: 
  87:     /** Wait queue for waiting takes */
  88:     private final Condition notEmpty = takeLock.newCondition();
  89: 
  90:     /** Lock held by put, offer, etc */
  91:     private final ReentrantLock putLock = new ReentrantLock();
  92: 
  93:     /** Wait queue for waiting puts */
  94:     private final Condition notFull = putLock.newCondition();
  95: 
  96:     /**
  97:      * Signals a waiting take. Called only from put/offer (which do not
  98:      * otherwise ordinarily lock takeLock.)
  99:      */
 100:     private void signalNotEmpty() {
 101:         final ReentrantLock takeLock = this.takeLock;
 102:         takeLock.lock();
 103:         try {
 104:             notEmpty.signal();
 105:         } finally {
 106:             takeLock.unlock();
 107:         }
 108:     }
 109: 
 110:     /**
 111:      * Signals a waiting put. Called only from take/poll.
 112:      */
 113:     private void signalNotFull() {
 114:         final ReentrantLock putLock = this.putLock;
 115:         putLock.lock();
 116:         try {
 117:             notFull.signal();
 118:         } finally {
 119:             putLock.unlock();
 120:         }
 121:     }
 122: 
 123:     /**
 124:      * Creates a node and links it at end of queue.
 125:      * @param x the item
 126:      */
 127:     private void insert(E x) {
 128:         last = last.next = new Node<E>(x);
 129:     }
 130: 
 131:     /**
 132:      * Removes a node from head of queue,
 133:      * @return the node
 134:      */
 135:     private E extract() {
 136:         Node<E> first = head.next;
 137:         head = first;
 138:         E x = first.item;
 139:         first.item = null;
 140:         return x;
 141:     }
 142: 
 143:     /**
 144:      * Lock to prevent both puts and takes.
 145:      */
 146:     private void fullyLock() {
 147:         putLock.lock();
 148:         takeLock.lock();
 149:     }
 150: 
 151:     /**
 152:      * Unlock to allow both puts and takes.
 153:      */
 154:     private void fullyUnlock() {
 155:         takeLock.unlock();
 156:         putLock.unlock();
 157:     }
 158: 
 159: 
 160:     /**
 161:      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
 162:      * {@link Integer#MAX_VALUE}.
 163:      */
 164:     public LinkedBlockingQueue() {
 165:         this(Integer.MAX_VALUE);
 166:     }
 167: 
 168:     /**
 169:      * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
 170:      *
 171:      * @param capacity the capacity of this queue
 172:      * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
 173:      *         than zero
 174:      */
 175:     public LinkedBlockingQueue(int capacity) {
 176:         if (capacity <= 0) throw new IllegalArgumentException();
 177:         this.capacity = capacity;
 178:         last = head = new Node<E>(null);
 179:     }
 180: 
 181:     /**
 182:      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
 183:      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 184:      * given collection,
 185:      * added in traversal order of the collection's iterator.
 186:      *
 187:      * @param c the collection of elements to initially contain
 188:      * @throws NullPointerException if the specified collection or any
 189:      *         of its elements are null
 190:      */
 191:     public LinkedBlockingQueue(Collection<? extends E> c) {
 192:         this(Integer.MAX_VALUE);
 193:         for (E e : c)
 194:             add(e);
 195:     }
 196: 
 197: 
 198:     // this doc comment is overridden to remove the reference to collections
 199:     // greater in size than Integer.MAX_VALUE
 200:     /**
 201:      * Returns the number of elements in this queue.
 202:      *
 203:      * @return the number of elements in this queue
 204:      */
 205:     public int size() {
 206:         return count.get();
 207:     }
 208: 
 209:     // this doc comment is a modified copy of the inherited doc comment,
 210:     // without the reference to unlimited queues.
 211:     /**
 212:      * Returns the number of additional elements that this queue can ideally
 213:      * (in the absence of memory or resource constraints) accept without
 214:      * blocking. This is always equal to the initial capacity of this queue
 215:      * less the current <tt>size</tt> of this queue.
 216:      *
 217:      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 218:      * an element will succeed by inspecting <tt>remainingCapacity</tt>
 219:      * because it may be the case that another thread is about to
 220:      * insert or remove an element.
 221:      */
 222:     public int remainingCapacity() {
 223:         return capacity - count.get();
 224:     }
 225: 
 226:     /**
 227:      * Inserts the specified element at the tail of this queue, waiting if
 228:      * necessary for space to become available.
 229:      *
 230:      * @throws InterruptedException {@inheritDoc}
 231:      * @throws NullPointerException {@inheritDoc}
 232:      */
 233:     public void put(E e) throws InterruptedException {
 234:         if (e == null) throw new NullPointerException();
 235:         // Note: convention in all put/take/etc is to preset
 236:         // local var holding count  negative to indicate failure unless set.
 237:         int c = -1;
 238:         final ReentrantLock putLock = this.putLock;
 239:         final AtomicInteger count = this.count;
 240:         putLock.lockInterruptibly();
 241:         try {
 242:             /*
 243:              * Note that count is used in wait guard even though it is
 244:              * not protected by lock. This works because count can
 245:              * only decrease at this point (all other puts are shut
 246:              * out by lock), and we (or some other waiting put) are
 247:              * signalled if it ever changes from
 248:              * capacity. Similarly for all other uses of count in
 249:              * other wait guards.
 250:              */
 251:             try {
 252:                 while (count.get() == capacity)
 253:                     notFull.await();
 254:             } catch (InterruptedException ie) {
 255:                 notFull.signal(); // propagate to a non-interrupted thread
 256:                 throw ie;
 257:             }
 258:             insert(e);
 259:             c = count.getAndIncrement();
 260:             if (c + 1 < capacity)
 261:                 notFull.signal();
 262:         } finally {
 263:             putLock.unlock();
 264:         }
 265:         if (c == 0)
 266:             signalNotEmpty();
 267:     }
 268: 
 269:     /**
 270:      * Inserts the specified element at the tail of this queue, waiting if
 271:      * necessary up to the specified wait time for space to become available.
 272:      *
 273:      * @return <tt>true</tt> if successful, or <tt>false</tt> if
 274:      *         the specified waiting time elapses before space is available.
 275:      * @throws InterruptedException {@inheritDoc}
 276:      * @throws NullPointerException {@inheritDoc}
 277:      */
 278:     public boolean offer(E e, long timeout, TimeUnit unit)
 279:         throws InterruptedException {
 280: 
 281:         if (e == null) throw new NullPointerException();
 282:         long nanos = unit.toNanos(timeout);
 283:         int c = -1;
 284:         final ReentrantLock putLock = this.putLock;
 285:         final AtomicInteger count = this.count;
 286:         putLock.lockInterruptibly();
 287:         try {
 288:             for (;;) {
 289:                 if (count.get() < capacity) {
 290:                     insert(e);
 291:                     c = count.getAndIncrement();
 292:                     if (c + 1 < capacity)
 293:                         notFull.signal();
 294:                     break;
 295:                 }
 296:                 if (nanos <= 0)
 297:                     return false;
 298:                 try {
 299:                     nanos = notFull.awaitNanos(nanos);
 300:                 } catch (InterruptedException ie) {
 301:                     notFull.signal(); // propagate to a non-interrupted thread
 302:                     throw ie;
 303:                 }
 304:             }
 305:         } finally {
 306:             putLock.unlock();
 307:         }
 308:         if (c == 0)
 309:             signalNotEmpty();
 310:         return true;
 311:     }
 312: 
 313:     /**
 314:      * Inserts the specified element at the tail of this queue if it is
 315:      * possible to do so immediately without exceeding the queue's capacity,
 316:      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
 317:      * is full.
 318:      * When using a capacity-restricted queue, this method is generally
 319:      * preferable to method {@link BlockingQueue#add add}, which can fail to
 320:      * insert an element only by throwing an exception.
 321:      *
 322:      * @throws NullPointerException if the specified element is null
 323:      */
 324:     public boolean offer(E e) {
 325:         if (e == null) throw new NullPointerException();
 326:         final AtomicInteger count = this.count;
 327:         if (count.get() == capacity)
 328:             return false;
 329:         int c = -1;
 330:         final ReentrantLock putLock = this.putLock;
 331:         putLock.lock();
 332:         try {
 333:             if (count.get() < capacity) {
 334:                 insert(e);
 335:                 c = count.getAndIncrement();
 336:                 if (c + 1 < capacity)
 337:                     notFull.signal();
 338:             }
 339:         } finally {
 340:             putLock.unlock();
 341:         }
 342:         if (c == 0)
 343:             signalNotEmpty();
 344:         return c >= 0;
 345:     }
 346: 
 347: 
 348:     public E take() throws InterruptedException {
 349:         E x;
 350:         int c = -1;
 351:         final AtomicInteger count = this.count;
 352:         final ReentrantLock takeLock = this.takeLock;
 353:         takeLock.lockInterruptibly();
 354:         try {
 355:             try {
 356:                 while (count.get() == 0)
 357:                     notEmpty.await();
 358:             } catch (InterruptedException ie) {
 359:                 notEmpty.signal(); // propagate to a non-interrupted thread
 360:                 throw ie;
 361:             }
 362: 
 363:             x = extract();
 364:             c = count.getAndDecrement();
 365:             if (c > 1)
 366:                 notEmpty.signal();
 367:         } finally {
 368:             takeLock.unlock();
 369:         }
 370:         if (c == capacity)
 371:             signalNotFull();
 372:         return x;
 373:     }
 374: 
 375:     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 376:         E x = null;
 377:         int c = -1;
 378:         long nanos = unit.toNanos(timeout);
 379:         final AtomicInteger count = this.count;
 380:         final ReentrantLock takeLock = this.takeLock;
 381:         takeLock.lockInterruptibly();
 382:         try {
 383:             for (;;) {
 384:                 if (count.get() > 0) {
 385:                     x = extract();
 386:                     c = count.getAndDecrement();
 387:                     if (c > 1)
 388:                         notEmpty.signal();
 389:                     break;
 390:                 }
 391:                 if (nanos <= 0)
 392:                     return null;
 393:                 try {
 394:                     nanos = notEmpty.awaitNanos(nanos);
 395:                 } catch (InterruptedException ie) {
 396:                     notEmpty.signal(); // propagate to a non-interrupted thread
 397:                     throw ie;
 398:                 }
 399:             }
 400:         } finally {
 401:             takeLock.unlock();
 402:         }
 403:         if (c == capacity)
 404:             signalNotFull();
 405:         return x;
 406:     }
 407: 
 408:     public E poll() {
 409:         final AtomicInteger count = this.count;
 410:         if (count.get() == 0)
 411:             return null;
 412:         E x = null;
 413:         int c = -1;
 414:         final ReentrantLock takeLock = this.takeLock;
 415:         takeLock.lock();
 416:         try {
 417:             if (count.get() > 0) {
 418:                 x = extract();
 419:                 c = count.getAndDecrement();
 420:                 if (c > 1)
 421:                     notEmpty.signal();
 422:             }
 423:         } finally {
 424:             takeLock.unlock();
 425:         }
 426:         if (c == capacity)
 427:             signalNotFull();
 428:         return x;
 429:     }
 430: 
 431: 
 432:     public E peek() {
 433:         if (count.get() == 0)
 434:             return null;
 435:         final ReentrantLock takeLock = this.takeLock;
 436:         takeLock.lock();
 437:         try {
 438:             Node<E> first = head.next;
 439:             if (first == null)
 440:                 return null;
 441:             else
 442:                 return first.item;
 443:         } finally {
 444:             takeLock.unlock();
 445:         }
 446:     }
 447: 
 448:     /**
 449:      * Removes a single instance of the specified element from this queue,
 450:      * if it is present.  More formally, removes an element <tt>e</tt> such
 451:      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
 452:      * elements.
 453:      * Returns <tt>true</tt> if this queue contained the specified element
 454:      * (or equivalently, if this queue changed as a result of the call).
 455:      *
 456:      * @param o element to be removed from this queue, if present
 457:      * @return <tt>true</tt> if this queue changed as a result of the call
 458:      */
 459:     public boolean remove(Object o) {
 460:         if (o == null) return false;
 461:         boolean removed = false;
 462:         fullyLock();
 463:         try {
 464:             Node<E> trail = head;
 465:             Node<E> p = head.next;
 466:             while (p != null) {
 467:                 if (o.equals(p.item)) {
 468:                     removed = true;
 469:                     break;
 470:                 }
 471:                 trail = p;
 472:                 p = p.next;
 473:             }
 474:             if (removed) {
 475:                 p.item = null;
 476:                 trail.next = p.next;
 477:                 if (last == p)
 478:                     last = trail;
 479:                 if (count.getAndDecrement() == capacity)
 480:                     notFull.signalAll();
 481:             }
 482:         } finally {
 483:             fullyUnlock();
 484:         }
 485:         return removed;
 486:     }
 487: 
 488:     /**
 489:      * Returns an array containing all of the elements in this queue, in
 490:      * proper sequence.
 491:      *
 492:      * <p>The returned array will be "safe" in that no references to it are
 493:      * maintained by this queue.  (In other words, this method must allocate
 494:      * a new array).  The caller is thus free to modify the returned array.
 495:      *
 496:      * <p>This method acts as bridge between array-based and collection-based
 497:      * APIs.
 498:      *
 499:      * @return an array containing all of the elements in this queue
 500:      */
 501:     public Object[] toArray() {
 502:         fullyLock();
 503:         try {
 504:             int size = count.get();
 505:             Object[] a = new Object[size];
 506:             int k = 0;
 507:             for (Node<E> p = head.next; p != null; p = p.next)
 508:                 a[k++] = p.item;
 509:             return a;
 510:         } finally {
 511:             fullyUnlock();
 512:         }
 513:     }
 514: 
 515:     /**
 516:      * Returns an array containing all of the elements in this queue, in
 517:      * proper sequence; the runtime type of the returned array is that of
 518:      * the specified array.  If the queue fits in the specified array, it
 519:      * is returned therein.  Otherwise, a new array is allocated with the
 520:      * runtime type of the specified array and the size of this queue.
 521:      *
 522:      * <p>If this queue fits in the specified array with room to spare
 523:      * (i.e., the array has more elements than this queue), the element in
 524:      * the array immediately following the end of the queue is set to
 525:      * <tt>null</tt>.
 526:      *
 527:      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 528:      * array-based and collection-based APIs.  Further, this method allows
 529:      * precise control over the runtime type of the output array, and may,
 530:      * under certain circumstances, be used to save allocation costs.
 531:      *
 532:      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
 533:      * The following code can be used to dump the queue into a newly
 534:      * allocated array of <tt>String</tt>:
 535:      *
 536:      * <pre>
 537:      *     String[] y = x.toArray(new String[0]);</pre>
 538:      *
 539:      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 540:      * <tt>toArray()</tt>.
 541:      *
 542:      * @param a the array into which the elements of the queue are to
 543:      *          be stored, if it is big enough; otherwise, a new array of the
 544:      *          same runtime type is allocated for this purpose
 545:      * @return an array containing all of the elements in this queue
 546:      * @throws ArrayStoreException if the runtime type of the specified array
 547:      *         is not a supertype of the runtime type of every element in
 548:      *         this queue
 549:      * @throws NullPointerException if the specified array is null
 550:      */
 551:     public <T> T[] toArray(T[] a) {
 552:         fullyLock();
 553:         try {
 554:             int size = count.get();
 555:             if (a.length < size)
 556:                 a = (T[])java.lang.reflect.Array.newInstance
 557:                     (a.getClass().getComponentType(), size);
 558: 
 559:             int k = 0;
 560:             for (Node p = head.next; p != null; p = p.next)
 561:                 a[k++] = (T)p.item;
 562:             if (a.length > k)
 563:                 a[k] = null;
 564:             return a;
 565:         } finally {
 566:             fullyUnlock();
 567:         }
 568:     }
 569: 
 570:     public String toString() {
 571:         fullyLock();
 572:         try {
 573:             return super.toString();
 574:         } finally {
 575:             fullyUnlock();
 576:         }
 577:     }
 578: 
 579:     /**
 580:      * Atomically removes all of the elements from this queue.
 581:      * The queue will be empty after this call returns.
 582:      */
 583:     public void clear() {
 584:         fullyLock();
 585:         try {
 586:             head.next = null;
 587:         assert head.item == null;
 588:         last = head;
 589:             if (count.getAndSet(0) == capacity)
 590:                 notFull.signalAll();
 591:         } finally {
 592:             fullyUnlock();
 593:         }
 594:     }
 595: 
 596:     /**
 597:      * @throws UnsupportedOperationException {@inheritDoc}
 598:      * @throws ClassCastException            {@inheritDoc}
 599:      * @throws NullPointerException          {@inheritDoc}
 600:      * @throws IllegalArgumentException      {@inheritDoc}
 601:      */
 602:     public int drainTo(Collection<? super E> c) {
 603:         if (c == null)
 604:             throw new NullPointerException();
 605:         if (c == this)
 606:             throw new IllegalArgumentException();
 607:         Node<E> first;
 608:         fullyLock();
 609:         try {
 610:             first = head.next;
 611:             head.next = null;
 612:         assert head.item == null;
 613:         last = head;
 614:             if (count.getAndSet(0) == capacity)
 615:                 notFull.signalAll();
 616:         } finally {
 617:             fullyUnlock();
 618:         }
 619:         // Transfer the elements outside of locks
 620:         int n = 0;
 621:         for (Node<E> p = first; p != null; p = p.next) {
 622:             c.add(p.item);
 623:             p.item = null;
 624:             ++n;
 625:         }
 626:         return n;
 627:     }
 628: 
 629:     /**
 630:      * @throws UnsupportedOperationException {@inheritDoc}
 631:      * @throws ClassCastException            {@inheritDoc}
 632:      * @throws NullPointerException          {@inheritDoc}
 633:      * @throws IllegalArgumentException      {@inheritDoc}
 634:      */
 635:     public int drainTo(Collection<? super E> c, int maxElements) {
 636:         if (c == null)
 637:             throw new NullPointerException();
 638:         if (c == this)
 639:             throw new IllegalArgumentException();
 640:         fullyLock();
 641:         try {
 642:             int n = 0;
 643:             Node<E> p = head.next;
 644:             while (p != null && n < maxElements) {
 645:                 c.add(p.item);
 646:                 p.item = null;
 647:                 p = p.next;
 648:                 ++n;
 649:             }
 650:             if (n != 0) {
 651:                 head.next = p;
 652:         assert head.item == null;
 653:         if (p == null)
 654:             last = head;
 655:                 if (count.getAndAdd(-n) == capacity)
 656:                     notFull.signalAll();
 657:             }
 658:             return n;
 659:         } finally {
 660:             fullyUnlock();
 661:         }
 662:     }
 663: 
 664:     /**
 665:      * Returns an iterator over the elements in this queue in proper sequence.
 666:      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
 667:      * will never throw {@link ConcurrentModificationException},
 668:      * and guarantees to traverse elements as they existed upon
 669:      * construction of the iterator, and may (but is not guaranteed to)
 670:      * reflect any modifications subsequent to construction.
 671:      *
 672:      * @return an iterator over the elements in this queue in proper sequence
 673:      */
 674:     public Iterator<E> iterator() {
 675:       return new Itr();
 676:     }
 677: 
 678:     private class Itr implements Iterator<E> {
 679:         /*
 680:          * Basic weak-consistent iterator.  At all times hold the next
 681:          * item to hand out so that if hasNext() reports true, we will
 682:          * still have it to return even if lost race with a take etc.
 683:          */
 684:         private Node<E> current;
 685:         private Node<E> lastRet;
 686:         private E currentElement;
 687: 
 688:         Itr() {
 689:             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
 690:             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
 691:             putLock.lock();
 692:             takeLock.lock();
 693:             try {
 694:                 current = head.next;
 695:                 if (current != null)
 696:                     currentElement = current.item;
 697:             } finally {
 698:                 takeLock.unlock();
 699:                 putLock.unlock();
 700:             }
 701:         }
 702: 
 703:         public boolean hasNext() {
 704:             return current != null;
 705:         }
 706: 
 707:         public E next() {
 708:             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
 709:             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
 710:             putLock.lock();
 711:             takeLock.lock();
 712:             try {
 713:                 if (current == null)
 714:                     throw new NoSuchElementException();
 715:                 E x = currentElement;
 716:                 lastRet = current;
 717:                 current = current.next;
 718:                 if (current != null)
 719:                     currentElement = current.item;
 720:                 return x;
 721:             } finally {
 722:                 takeLock.unlock();
 723:                 putLock.unlock();
 724:             }
 725:         }
 726: 
 727:         public void remove() {
 728:             if (lastRet == null)
 729:                 throw new IllegalStateException();
 730:             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
 731:             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
 732:             putLock.lock();
 733:             takeLock.lock();
 734:             try {
 735:                 Node<E> node = lastRet;
 736:                 lastRet = null;
 737:                 Node<E> trail = head;
 738:                 Node<E> p = head.next;
 739:                 while (p != null && p != node) {
 740:                     trail = p;
 741:                     p = p.next;
 742:                 }
 743:                 if (p == node) {
 744:                     p.item = null;
 745:                     trail.next = p.next;
 746:                     if (last == p)
 747:                         last = trail;
 748:                     int c = count.getAndDecrement();
 749:                     if (c == capacity)
 750:                         notFull.signalAll();
 751:                 }
 752:             } finally {
 753:                 takeLock.unlock();
 754:                 putLock.unlock();
 755:             }
 756:         }
 757:     }
 758: 
 759:     /**
 760:      * Save the state to a stream (that is, serialize it).
 761:      *
 762:      * @serialData The capacity is emitted (int), followed by all of
 763:      * its elements (each an <tt>Object</tt>) in the proper order,
 764:      * followed by a null
 765:      * @param s the stream
 766:      */
 767:     private void writeObject(java.io.ObjectOutputStream s)
 768:         throws java.io.IOException {
 769: 
 770:         fullyLock();
 771:         try {
 772:             // Write out any hidden stuff, plus capacity
 773:             s.defaultWriteObject();
 774: 
 775:             // Write out all elements in the proper order.
 776:             for (Node<E> p = head.next; p != null; p = p.next)
 777:                 s.writeObject(p.item);
 778: 
 779:             // Use trailing null as sentinel
 780:             s.writeObject(null);
 781:         } finally {
 782:             fullyUnlock();
 783:         }
 784:     }
 785: 
 786:     /**
 787:      * Reconstitute this queue instance from a stream (that is,
 788:      * deserialize it).
 789:      * @param s the stream
 790:      */
 791:     private void readObject(java.io.ObjectInputStream s)
 792:         throws java.io.IOException, ClassNotFoundException {
 793:         // Read in capacity, and any hidden stuff
 794:         s.defaultReadObject();
 795: 
 796:         count.set(0);
 797:         last = head = new Node<E>(null);
 798: 
 799:         // Read in all elements and place in queue
 800:         for (;;) {
 801:             E item = (E)s.readObject();
 802:             if (item == null)
 803:                 break;
 804:             add(item);
 805:         }
 806:     }
 807: }