Source for java.util.concurrent.ArrayBlockingQueue

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.concurrent.locks.*;
   9: import java.util.*;
  10: 
  11: /**
  12:  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
  13:  * array.  This queue orders elements FIFO (first-in-first-out).  The
  14:  * <em>head</em> of the queue is that element that has been on the
  15:  * queue the longest time.  The <em>tail</em> of the queue is that
  16:  * element that has been on the queue the shortest time. New elements
  17:  * are inserted at the tail of the queue, and the queue retrieval
  18:  * operations obtain elements at the head of the queue.
  19:  *
  20:  * <p>This is a classic &quot;bounded buffer&quot;, in which a
  21:  * fixed-sized array holds elements inserted by producers and
  22:  * extracted by consumers.  Once created, the capacity cannot be
  23:  * increased.  Attempts to <tt>put</tt> an element into a full queue
  24:  * will result in the operation blocking; attempts to <tt>take</tt> an
  25:  * element from an empty queue will similarly block.
  26:  *
  27:  * <p> This class supports an optional fairness policy for ordering
  28:  * waiting producer and consumer threads.  By default, this ordering
  29:  * is not guaranteed. However, a queue constructed with fairness set
  30:  * to <tt>true</tt> grants threads access in FIFO order. Fairness
  31:  * generally decreases throughput but reduces variability and avoids
  32:  * starvation.
  33:  *
  34:  * <p>This class and its iterator implement all of the
  35:  * <em>optional</em> methods of the {@link Collection} and {@link
  36:  * Iterator} interfaces.
  37:  *
  38:  * <p>This class is a member of the
  39:  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  40:  * Java Collections Framework</a>.
  41:  *
  42:  * @since 1.5
  43:  * @author Doug Lea
  44:  * @param <E> the type of elements held in this collection
  45:  */
  46: public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  47:         implements BlockingQueue<E>, java.io.Serializable {
  48: 
  49:     /**
  50:      * Serialization ID. This class relies on default serialization
  51:      * even for the items array, which is default-serialized, even if
  52:      * it is empty. Otherwise it could not be declared final, which is
  53:      * necessary here.
  54:      */
  55:     private static final long serialVersionUID = -817911632652898426L;
  56: 
  57:     /** The queued items  */
  58:     private final E[] items;
  59:     /** items index for next take, poll or remove */
  60:     private int takeIndex;
  61:     /** items index for next put, offer, or add. */
  62:     private int putIndex;
  63:     /** Number of items in the queue */
  64:     private int count;
  65: 
  66:     /*
  67:      * Concurrency control uses the classic two-condition algorithm
  68:      * found in any textbook.
  69:      */
  70: 
  71:     /** Main lock guarding all access */
  72:     private final ReentrantLock lock;
  73:     /** Condition for waiting takes */
  74:     private final Condition notEmpty;
  75:     /** Condition for waiting puts */
  76:     private final Condition notFull;
  77: 
  78:     // Internal helper methods
  79: 
  80:     /**
  81:      * Circularly increment i.
  82:      */
  83:     final int inc(int i) {
  84:         return (++i == items.length)? 0 : i;
  85:     }
  86: 
  87:     /**
  88:      * Inserts element at current put position, advances, and signals.
  89:      * Call only when holding lock.
  90:      */
  91:     private void insert(E x) {
  92:         items[putIndex] = x;
  93:         putIndex = inc(putIndex);
  94:         ++count;
  95:         notEmpty.signal();
  96:     }
  97: 
  98:     /**
  99:      * Extracts element at current take position, advances, and signals.
 100:      * Call only when holding lock.
 101:      */
 102:     private E extract() {
 103:         final E[] items = this.items;
 104:         E x = items[takeIndex];
 105:         items[takeIndex] = null;
 106:         takeIndex = inc(takeIndex);
 107:         --count;
 108:         notFull.signal();
 109:         return x;
 110:     }
 111: 
 112:     /**
 113:      * Utility for remove and iterator.remove: Delete item at position i.
 114:      * Call only when holding lock.
 115:      */
 116:     void removeAt(int i) {
 117:         final E[] items = this.items;
 118:         // if removing front item, just advance
 119:         if (i == takeIndex) {
 120:             items[takeIndex] = null;
 121:             takeIndex = inc(takeIndex);
 122:         } else {
 123:             // slide over all others up through putIndex.
 124:             for (;;) {
 125:                 int nexti = inc(i);
 126:                 if (nexti != putIndex) {
 127:                     items[i] = items[nexti];
 128:                     i = nexti;
 129:                 } else {
 130:                     items[i] = null;
 131:                     putIndex = i;
 132:                     break;
 133:                 }
 134:             }
 135:         }
 136:         --count;
 137:         notFull.signal();
 138:     }
 139: 
 140:     /**
 141:      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
 142:      * capacity and default access policy.
 143:      *
 144:      * @param capacity the capacity of this queue
 145:      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
 146:      */
 147:     public ArrayBlockingQueue(int capacity) {
 148:         this(capacity, false);
 149:     }
 150: 
 151:     /**
 152:      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
 153:      * capacity and the specified access policy.
 154:      *
 155:      * @param capacity the capacity of this queue
 156:      * @param fair if <tt>true</tt> then queue accesses for threads blocked
 157:      *        on insertion or removal, are processed in FIFO order;
 158:      *        if <tt>false</tt> the access order is unspecified.
 159:      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
 160:      */
 161:     public ArrayBlockingQueue(int capacity, boolean fair) {
 162:         if (capacity <= 0)
 163:             throw new IllegalArgumentException();
 164:         this.items = (E[]) new Object[capacity];
 165:         lock = new ReentrantLock(fair);
 166:         notEmpty = lock.newCondition();
 167:         notFull =  lock.newCondition();
 168:     }
 169: 
 170:     /**
 171:      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
 172:      * capacity, the specified access policy and initially containing the
 173:      * elements of the given collection,
 174:      * added in traversal order of the collection's iterator.
 175:      *
 176:      * @param capacity the capacity of this queue
 177:      * @param fair if <tt>true</tt> then queue accesses for threads blocked
 178:      *        on insertion or removal, are processed in FIFO order;
 179:      *        if <tt>false</tt> the access order is unspecified.
 180:      * @param c the collection of elements to initially contain
 181:      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
 182:      *         <tt>c.size()</tt>, or less than 1.
 183:      * @throws NullPointerException if the specified collection or any
 184:      *         of its elements are null
 185:      */
 186:     public ArrayBlockingQueue(int capacity, boolean fair,
 187:                               Collection<? extends E> c) {
 188:         this(capacity, fair);
 189:         if (capacity < c.size())
 190:             throw new IllegalArgumentException();
 191: 
 192:         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
 193:             add(it.next());
 194:     }
 195: 
 196:     /**
 197:      * Inserts the specified element at the tail of this queue if it is
 198:      * possible to do so immediately without exceeding the queue's capacity,
 199:      * returning <tt>true</tt> upon success and throwing an
 200:      * <tt>IllegalStateException</tt> if this queue is full.
 201:      *
 202:      * @param e the element to add
 203:      * @return <tt>true</tt> (as specified by {@link Collection#add})
 204:      * @throws IllegalStateException if this queue is full
 205:      * @throws NullPointerException if the specified element is null
 206:      */
 207:     public boolean add(E e) {
 208:     return super.add(e);
 209:     }
 210: 
 211:     /**
 212:      * Inserts the specified element at the tail of this queue if it is
 213:      * possible to do so immediately without exceeding the queue's capacity,
 214:      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
 215:      * is full.  This method is generally preferable to method {@link #add},
 216:      * which can fail to insert an element only by throwing an exception.
 217:      *
 218:      * @throws NullPointerException if the specified element is null
 219:      */
 220:     public boolean offer(E e) {
 221:         if (e == null) throw new NullPointerException();
 222:         final ReentrantLock lock = this.lock;
 223:         lock.lock();
 224:         try {
 225:             if (count == items.length)
 226:                 return false;
 227:             else {
 228:                 insert(e);
 229:                 return true;
 230:             }
 231:         } finally {
 232:             lock.unlock();
 233:         }
 234:     }
 235: 
 236:     /**
 237:      * Inserts the specified element at the tail of this queue, waiting
 238:      * for space to become available if the queue is full.
 239:      *
 240:      * @throws InterruptedException {@inheritDoc}
 241:      * @throws NullPointerException {@inheritDoc}
 242:      */
 243:     public void put(E e) throws InterruptedException {
 244:         if (e == null) throw new NullPointerException();
 245:         final E[] items = this.items;
 246:         final ReentrantLock lock = this.lock;
 247:         lock.lockInterruptibly();
 248:         try {
 249:             try {
 250:                 while (count == items.length)
 251:                     notFull.await();
 252:             } catch (InterruptedException ie) {
 253:                 notFull.signal(); // propagate to non-interrupted thread
 254:                 throw ie;
 255:             }
 256:             insert(e);
 257:         } finally {
 258:             lock.unlock();
 259:         }
 260:     }
 261: 
 262:     /**
 263:      * Inserts the specified element at the tail of this queue, waiting
 264:      * up to the specified wait time for space to become available if
 265:      * the queue is full.
 266:      *
 267:      * @throws InterruptedException {@inheritDoc}
 268:      * @throws NullPointerException {@inheritDoc}
 269:      */
 270:     public boolean offer(E e, long timeout, TimeUnit unit)
 271:         throws InterruptedException {
 272: 
 273:         if (e == null) throw new NullPointerException();
 274:     long nanos = unit.toNanos(timeout);
 275:         final ReentrantLock lock = this.lock;
 276:         lock.lockInterruptibly();
 277:         try {
 278:             for (;;) {
 279:                 if (count != items.length) {
 280:                     insert(e);
 281:                     return true;
 282:                 }
 283:                 if (nanos <= 0)
 284:                     return false;
 285:                 try {
 286:                     nanos = notFull.awaitNanos(nanos);
 287:                 } catch (InterruptedException ie) {
 288:                     notFull.signal(); // propagate to non-interrupted thread
 289:                     throw ie;
 290:                 }
 291:             }
 292:         } finally {
 293:             lock.unlock();
 294:         }
 295:     }
 296: 
 297:     public E poll() {
 298:         final ReentrantLock lock = this.lock;
 299:         lock.lock();
 300:         try {
 301:             if (count == 0)
 302:                 return null;
 303:             E x = extract();
 304:             return x;
 305:         } finally {
 306:             lock.unlock();
 307:         }
 308:     }
 309: 
 310:     public E take() throws InterruptedException {
 311:         final ReentrantLock lock = this.lock;
 312:         lock.lockInterruptibly();
 313:         try {
 314:             try {
 315:                 while (count == 0)
 316:                     notEmpty.await();
 317:             } catch (InterruptedException ie) {
 318:                 notEmpty.signal(); // propagate to non-interrupted thread
 319:                 throw ie;
 320:             }
 321:             E x = extract();
 322:             return x;
 323:         } finally {
 324:             lock.unlock();
 325:         }
 326:     }
 327: 
 328:     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 329:     long nanos = unit.toNanos(timeout);
 330:         final ReentrantLock lock = this.lock;
 331:         lock.lockInterruptibly();
 332:         try {
 333:             for (;;) {
 334:                 if (count != 0) {
 335:                     E x = extract();
 336:                     return x;
 337:                 }
 338:                 if (nanos <= 0)
 339:                     return null;
 340:                 try {
 341:                     nanos = notEmpty.awaitNanos(nanos);
 342:                 } catch (InterruptedException ie) {
 343:                     notEmpty.signal(); // propagate to non-interrupted thread
 344:                     throw ie;
 345:                 }
 346: 
 347:             }
 348:         } finally {
 349:             lock.unlock();
 350:         }
 351:     }
 352: 
 353:     public E peek() {
 354:         final ReentrantLock lock = this.lock;
 355:         lock.lock();
 356:         try {
 357:             return (count == 0) ? null : items[takeIndex];
 358:         } finally {
 359:             lock.unlock();
 360:         }
 361:     }
 362: 
 363:     // this doc comment is overridden to remove the reference to collections
 364:     // greater in size than Integer.MAX_VALUE
 365:     /**
 366:      * Returns the number of elements in this queue.
 367:      *
 368:      * @return the number of elements in this queue
 369:      */
 370:     public int size() {
 371:         final ReentrantLock lock = this.lock;
 372:         lock.lock();
 373:         try {
 374:             return count;
 375:         } finally {
 376:             lock.unlock();
 377:         }
 378:     }
 379: 
 380:     // this doc comment is a modified copy of the inherited doc comment,
 381:     // without the reference to unlimited queues.
 382:     /**
 383:      * Returns the number of additional elements that this queue can ideally
 384:      * (in the absence of memory or resource constraints) accept without
 385:      * blocking. This is always equal to the initial capacity of this queue
 386:      * less the current <tt>size</tt> of this queue.
 387:      *
 388:      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 389:      * an element will succeed by inspecting <tt>remainingCapacity</tt>
 390:      * because it may be the case that another thread is about to
 391:      * insert or remove an element.
 392:      */
 393:     public int remainingCapacity() {
 394:         final ReentrantLock lock = this.lock;
 395:         lock.lock();
 396:         try {
 397:             return items.length - count;
 398:         } finally {
 399:             lock.unlock();
 400:         }
 401:     }
 402: 
 403:     /**
 404:      * Removes a single instance of the specified element from this queue,
 405:      * if it is present.  More formally, removes an element <tt>e</tt> such
 406:      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
 407:      * elements.
 408:      * Returns <tt>true</tt> if this queue contained the specified element
 409:      * (or equivalently, if this queue changed as a result of the call).
 410:      *
 411:      * @param o element to be removed from this queue, if present
 412:      * @return <tt>true</tt> if this queue changed as a result of the call
 413:      */
 414:     public boolean remove(Object o) {
 415:         if (o == null) return false;
 416:         final E[] items = this.items;
 417:         final ReentrantLock lock = this.lock;
 418:         lock.lock();
 419:         try {
 420:             int i = takeIndex;
 421:             int k = 0;
 422:             for (;;) {
 423:                 if (k++ >= count)
 424:                     return false;
 425:                 if (o.equals(items[i])) {
 426:                     removeAt(i);
 427:                     return true;
 428:                 }
 429:                 i = inc(i);
 430:             }
 431: 
 432:         } finally {
 433:             lock.unlock();
 434:         }
 435:     }
 436: 
 437:     /**
 438:      * Returns <tt>true</tt> if this queue contains the specified element.
 439:      * More formally, returns <tt>true</tt> if and only if this queue contains
 440:      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
 441:      *
 442:      * @param o object to be checked for containment in this queue
 443:      * @return <tt>true</tt> if this queue contains the specified element
 444:      */
 445:     public boolean contains(Object o) {
 446:         if (o == null) return false;
 447:         final E[] items = this.items;
 448:         final ReentrantLock lock = this.lock;
 449:         lock.lock();
 450:         try {
 451:             int i = takeIndex;
 452:             int k = 0;
 453:             while (k++ < count) {
 454:                 if (o.equals(items[i]))
 455:                     return true;
 456:                 i = inc(i);
 457:             }
 458:             return false;
 459:         } finally {
 460:             lock.unlock();
 461:         }
 462:     }
 463: 
 464:     /**
 465:      * Returns an array containing all of the elements in this queue, in
 466:      * proper sequence.
 467:      *
 468:      * <p>The returned array will be "safe" in that no references to it are
 469:      * maintained by this queue.  (In other words, this method must allocate
 470:      * a new array).  The caller is thus free to modify the returned array.
 471:      *
 472:      * <p>This method acts as bridge between array-based and collection-based
 473:      * APIs.
 474:      *
 475:      * @return an array containing all of the elements in this queue
 476:      */
 477:     public Object[] toArray() {
 478:         final E[] items = this.items;
 479:         final ReentrantLock lock = this.lock;
 480:         lock.lock();
 481:         try {
 482:             Object[] a = new Object[count];
 483:             int k = 0;
 484:             int i = takeIndex;
 485:             while (k < count) {
 486:                 a[k++] = items[i];
 487:                 i = inc(i);
 488:             }
 489:             return a;
 490:         } finally {
 491:             lock.unlock();
 492:         }
 493:     }
 494: 
 495:     /**
 496:      * Returns an array containing all of the elements in this queue, in
 497:      * proper sequence; the runtime type of the returned array is that of
 498:      * the specified array.  If the queue fits in the specified array, it
 499:      * is returned therein.  Otherwise, a new array is allocated with the
 500:      * runtime type of the specified array and the size of this queue.
 501:      *
 502:      * <p>If this queue fits in the specified array with room to spare
 503:      * (i.e., the array has more elements than this queue), the element in
 504:      * the array immediately following the end of the queue is set to
 505:      * <tt>null</tt>.
 506:      *
 507:      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 508:      * array-based and collection-based APIs.  Further, this method allows
 509:      * precise control over the runtime type of the output array, and may,
 510:      * under certain circumstances, be used to save allocation costs.
 511:      *
 512:      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
 513:      * The following code can be used to dump the queue into a newly
 514:      * allocated array of <tt>String</tt>:
 515:      *
 516:      * <pre>
 517:      *     String[] y = x.toArray(new String[0]);</pre>
 518:      *
 519:      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 520:      * <tt>toArray()</tt>.
 521:      *
 522:      * @param a the array into which the elements of the queue are to
 523:      *          be stored, if it is big enough; otherwise, a new array of the
 524:      *          same runtime type is allocated for this purpose
 525:      * @return an array containing all of the elements in this queue
 526:      * @throws ArrayStoreException if the runtime type of the specified array
 527:      *         is not a supertype of the runtime type of every element in
 528:      *         this queue
 529:      * @throws NullPointerException if the specified array is null
 530:      */
 531:     public <T> T[] toArray(T[] a) {
 532:         final E[] items = this.items;
 533:         final ReentrantLock lock = this.lock;
 534:         lock.lock();
 535:         try {
 536:             if (a.length < count)
 537:                 a = (T[])java.lang.reflect.Array.newInstance(
 538:                     a.getClass().getComponentType(),
 539:                     count
 540:                     );
 541: 
 542:             int k = 0;
 543:             int i = takeIndex;
 544:             while (k < count) {
 545:                 a[k++] = (T)items[i];
 546:                 i = inc(i);
 547:             }
 548:             if (a.length > count)
 549:                 a[count] = null;
 550:             return a;
 551:         } finally {
 552:             lock.unlock();
 553:         }
 554:     }
 555: 
 556:     public String toString() {
 557:         final ReentrantLock lock = this.lock;
 558:         lock.lock();
 559:         try {
 560:             return super.toString();
 561:         } finally {
 562:             lock.unlock();
 563:         }
 564:     }
 565: 
 566:     /**
 567:      * Atomically removes all of the elements from this queue.
 568:      * The queue will be empty after this call returns.
 569:      */
 570:     public void clear() {
 571:         final E[] items = this.items;
 572:         final ReentrantLock lock = this.lock;
 573:         lock.lock();
 574:         try {
 575:             int i = takeIndex;
 576:             int k = count;
 577:             while (k-- > 0) {
 578:                 items[i] = null;
 579:                 i = inc(i);
 580:             }
 581:             count = 0;
 582:             putIndex = 0;
 583:             takeIndex = 0;
 584:             notFull.signalAll();
 585:         } finally {
 586:             lock.unlock();
 587:         }
 588:     }
 589: 
 590:     /**
 591:      * @throws UnsupportedOperationException {@inheritDoc}
 592:      * @throws ClassCastException            {@inheritDoc}
 593:      * @throws NullPointerException          {@inheritDoc}
 594:      * @throws IllegalArgumentException      {@inheritDoc}
 595:      */
 596:     public int drainTo(Collection<? super E> c) {
 597:         if (c == null)
 598:             throw new NullPointerException();
 599:         if (c == this)
 600:             throw new IllegalArgumentException();
 601:         final E[] items = this.items;
 602:         final ReentrantLock lock = this.lock;
 603:         lock.lock();
 604:         try {
 605:             int i = takeIndex;
 606:             int n = 0;
 607:             int max = count;
 608:             while (n < max) {
 609:                 c.add(items[i]);
 610:                 items[i] = null;
 611:                 i = inc(i);
 612:                 ++n;
 613:             }
 614:             if (n > 0) {
 615:                 count = 0;
 616:                 putIndex = 0;
 617:                 takeIndex = 0;
 618:                 notFull.signalAll();
 619:             }
 620:             return n;
 621:         } finally {
 622:             lock.unlock();
 623:         }
 624:     }
 625: 
 626:     /**
 627:      * @throws UnsupportedOperationException {@inheritDoc}
 628:      * @throws ClassCastException            {@inheritDoc}
 629:      * @throws NullPointerException          {@inheritDoc}
 630:      * @throws IllegalArgumentException      {@inheritDoc}
 631:      */
 632:     public int drainTo(Collection<? super E> c, int maxElements) {
 633:         if (c == null)
 634:             throw new NullPointerException();
 635:         if (c == this)
 636:             throw new IllegalArgumentException();
 637:         if (maxElements <= 0)
 638:             return 0;
 639:         final E[] items = this.items;
 640:         final ReentrantLock lock = this.lock;
 641:         lock.lock();
 642:         try {
 643:             int i = takeIndex;
 644:             int n = 0;
 645:             int sz = count;
 646:             int max = (maxElements < count)? maxElements : count;
 647:             while (n < max) {
 648:                 c.add(items[i]);
 649:                 items[i] = null;
 650:                 i = inc(i);
 651:                 ++n;
 652:             }
 653:             if (n > 0) {
 654:                 count -= n;
 655:                 takeIndex = i;
 656:                 notFull.signalAll();
 657:             }
 658:             return n;
 659:         } finally {
 660:             lock.unlock();
 661:         }
 662:     }
 663: 
 664: 
 665:     /**
 666:      * Returns an iterator over the elements in this queue in proper sequence.
 667:      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
 668:      * will never throw {@link ConcurrentModificationException},
 669:      * and guarantees to traverse elements as they existed upon
 670:      * construction of the iterator, and may (but is not guaranteed to)
 671:      * reflect any modifications subsequent to construction.
 672:      *
 673:      * @return an iterator over the elements in this queue in proper sequence
 674:      */
 675:     public Iterator<E> iterator() {
 676:         final ReentrantLock lock = this.lock;
 677:         lock.lock();
 678:         try {
 679:             return new Itr();
 680:         } finally {
 681:             lock.unlock();
 682:         }
 683:     }
 684: 
 685:     /**
 686:      * Iterator for ArrayBlockingQueue
 687:      */
 688:     private class Itr implements Iterator<E> {
 689:         /**
 690:          * Index of element to be returned by next,
 691:          * or a negative number if no such.
 692:          */
 693:         private int nextIndex;
 694: 
 695:         /**
 696:          * nextItem holds on to item fields because once we claim
 697:          * that an element exists in hasNext(), we must return it in
 698:          * the following next() call even if it was in the process of
 699:          * being removed when hasNext() was called.
 700:          */
 701:         private E nextItem;
 702: 
 703:         /**
 704:          * Index of element returned by most recent call to next.
 705:          * Reset to -1 if this element is deleted by a call to remove.
 706:          */
 707:         private int lastRet;
 708: 
 709:         Itr() {
 710:             lastRet = -1;
 711:             if (count == 0)
 712:                 nextIndex = -1;
 713:             else {
 714:                 nextIndex = takeIndex;
 715:                 nextItem = items[takeIndex];
 716:             }
 717:         }
 718: 
 719:         public boolean hasNext() {
 720:             /*
 721:              * No sync. We can return true by mistake here
 722:              * only if this iterator passed across threads,
 723:              * which we don't support anyway.
 724:              */
 725:             return nextIndex >= 0;
 726:         }
 727: 
 728:         /**
 729:          * Checks whether nextIndex is valid; if so setting nextItem.
 730:          * Stops iterator when either hits putIndex or sees null item.
 731:          */
 732:         private void checkNext() {
 733:             if (nextIndex == putIndex) {
 734:                 nextIndex = -1;
 735:                 nextItem = null;
 736:             } else {
 737:                 nextItem = items[nextIndex];
 738:                 if (nextItem == null)
 739:                     nextIndex = -1;
 740:             }
 741:         }
 742: 
 743:         public E next() {
 744:             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
 745:             lock.lock();
 746:             try {
 747:                 if (nextIndex < 0)
 748:                     throw new NoSuchElementException();
 749:                 lastRet = nextIndex;
 750:                 E x = nextItem;
 751:                 nextIndex = inc(nextIndex);
 752:                 checkNext();
 753:                 return x;
 754:             } finally {
 755:                 lock.unlock();
 756:             }
 757:         }
 758: 
 759:         public void remove() {
 760:             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
 761:             lock.lock();
 762:             try {
 763:                 int i = lastRet;
 764:                 if (i == -1)
 765:                     throw new IllegalStateException();
 766:                 lastRet = -1;
 767: 
 768:                 int ti = takeIndex;
 769:                 removeAt(i);
 770:                 // back up cursor (reset to front if was first element)
 771:                 nextIndex = (i == ti) ? takeIndex : i;
 772:                 checkNext();
 773:             } finally {
 774:                 lock.unlock();
 775:             }
 776:         }
 777:     }
 778: }