1:
6:
7: package ;
8: import ;
9: import ;
10: import ;
11:
12:
44: public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45: implements BlockingQueue<E>, java.io.Serializable {
46: private static final long serialVersionUID = -6903933977591709194L;
47:
48:
61:
62:
65: static class Node<E> {
66:
67: volatile E item;
68: Node<E> next;
69: Node(E x) { item = x; }
70: }
71:
72:
73: private final int capacity;
74:
75:
76: private final AtomicInteger count = new AtomicInteger(0);
77:
78:
79: private transient Node<E> head;
80:
81:
82: private transient Node<E> last;
83:
84:
85: private final ReentrantLock takeLock = new ReentrantLock();
86:
87:
88: private final Condition notEmpty = takeLock.newCondition();
89:
90:
91: private final ReentrantLock putLock = new ReentrantLock();
92:
93:
94: private final Condition notFull = putLock.newCondition();
95:
96:
100: private void signalNotEmpty() {
101: final ReentrantLock takeLock = this.takeLock;
102: takeLock.lock();
103: try {
104: notEmpty.signal();
105: } finally {
106: takeLock.unlock();
107: }
108: }
109:
110:
113: private void signalNotFull() {
114: final ReentrantLock putLock = this.putLock;
115: putLock.lock();
116: try {
117: notFull.signal();
118: } finally {
119: putLock.unlock();
120: }
121: }
122:
123:
127: private void insert(E x) {
128: last = last.next = new Node<E>(x);
129: }
130:
131:
135: private E extract() {
136: Node<E> first = head.next;
137: head = first;
138: E x = first.item;
139: first.item = null;
140: return x;
141: }
142:
143:
146: private void fullyLock() {
147: putLock.lock();
148: takeLock.lock();
149: }
150:
151:
154: private void fullyUnlock() {
155: takeLock.unlock();
156: putLock.unlock();
157: }
158:
159:
160:
164: public LinkedBlockingQueue() {
165: this(Integer.MAX_VALUE);
166: }
167:
168:
175: public LinkedBlockingQueue(int capacity) {
176: if (capacity <= 0) throw new IllegalArgumentException();
177: this.capacity = capacity;
178: last = head = new Node<E>(null);
179: }
180:
181:
191: public LinkedBlockingQueue(Collection<? extends E> c) {
192: this(Integer.MAX_VALUE);
193: for (E e : c)
194: add(e);
195: }
196:
197:
198:
199:
200:
205: public int size() {
206: return count.get();
207: }
208:
209:
210:
211:
222: public int remainingCapacity() {
223: return capacity - count.get();
224: }
225:
226:
233: public void put(E e) throws InterruptedException {
234: if (e == null) throw new NullPointerException();
235:
236:
237: int c = -1;
238: final ReentrantLock putLock = this.putLock;
239: final AtomicInteger count = this.count;
240: putLock.lockInterruptibly();
241: try {
242:
251: try {
252: while (count.get() == capacity)
253: notFull.await();
254: } catch (InterruptedException ie) {
255: notFull.signal();
256: throw ie;
257: }
258: insert(e);
259: c = count.getAndIncrement();
260: if (c + 1 < capacity)
261: notFull.signal();
262: } finally {
263: putLock.unlock();
264: }
265: if (c == 0)
266: signalNotEmpty();
267: }
268:
269:
278: public boolean offer(E e, long timeout, TimeUnit unit)
279: throws InterruptedException {
280:
281: if (e == null) throw new NullPointerException();
282: long nanos = unit.toNanos(timeout);
283: int c = -1;
284: final ReentrantLock putLock = this.putLock;
285: final AtomicInteger count = this.count;
286: putLock.lockInterruptibly();
287: try {
288: for (;;) {
289: if (count.get() < capacity) {
290: insert(e);
291: c = count.getAndIncrement();
292: if (c + 1 < capacity)
293: notFull.signal();
294: break;
295: }
296: if (nanos <= 0)
297: return false;
298: try {
299: nanos = notFull.awaitNanos(nanos);
300: } catch (InterruptedException ie) {
301: notFull.signal();
302: throw ie;
303: }
304: }
305: } finally {
306: putLock.unlock();
307: }
308: if (c == 0)
309: signalNotEmpty();
310: return true;
311: }
312:
313:
324: public boolean offer(E e) {
325: if (e == null) throw new NullPointerException();
326: final AtomicInteger count = this.count;
327: if (count.get() == capacity)
328: return false;
329: int c = -1;
330: final ReentrantLock putLock = this.putLock;
331: putLock.lock();
332: try {
333: if (count.get() < capacity) {
334: insert(e);
335: c = count.getAndIncrement();
336: if (c + 1 < capacity)
337: notFull.signal();
338: }
339: } finally {
340: putLock.unlock();
341: }
342: if (c == 0)
343: signalNotEmpty();
344: return c >= 0;
345: }
346:
347:
348: public E take() throws InterruptedException {
349: E x;
350: int c = -1;
351: final AtomicInteger count = this.count;
352: final ReentrantLock takeLock = this.takeLock;
353: takeLock.lockInterruptibly();
354: try {
355: try {
356: while (count.get() == 0)
357: notEmpty.await();
358: } catch (InterruptedException ie) {
359: notEmpty.signal();
360: throw ie;
361: }
362:
363: x = extract();
364: c = count.getAndDecrement();
365: if (c > 1)
366: notEmpty.signal();
367: } finally {
368: takeLock.unlock();
369: }
370: if (c == capacity)
371: signalNotFull();
372: return x;
373: }
374:
375: public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376: E x = null;
377: int c = -1;
378: long nanos = unit.toNanos(timeout);
379: final AtomicInteger count = this.count;
380: final ReentrantLock takeLock = this.takeLock;
381: takeLock.lockInterruptibly();
382: try {
383: for (;;) {
384: if (count.get() > 0) {
385: x = extract();
386: c = count.getAndDecrement();
387: if (c > 1)
388: notEmpty.signal();
389: break;
390: }
391: if (nanos <= 0)
392: return null;
393: try {
394: nanos = notEmpty.awaitNanos(nanos);
395: } catch (InterruptedException ie) {
396: notEmpty.signal();
397: throw ie;
398: }
399: }
400: } finally {
401: takeLock.unlock();
402: }
403: if (c == capacity)
404: signalNotFull();
405: return x;
406: }
407:
408: public E poll() {
409: final AtomicInteger count = this.count;
410: if (count.get() == 0)
411: return null;
412: E x = null;
413: int c = -1;
414: final ReentrantLock takeLock = this.takeLock;
415: takeLock.lock();
416: try {
417: if (count.get() > 0) {
418: x = extract();
419: c = count.getAndDecrement();
420: if (c > 1)
421: notEmpty.signal();
422: }
423: } finally {
424: takeLock.unlock();
425: }
426: if (c == capacity)
427: signalNotFull();
428: return x;
429: }
430:
431:
432: public E peek() {
433: if (count.get() == 0)
434: return null;
435: final ReentrantLock takeLock = this.takeLock;
436: takeLock.lock();
437: try {
438: Node<E> first = head.next;
439: if (first == null)
440: return null;
441: else
442: return first.item;
443: } finally {
444: takeLock.unlock();
445: }
446: }
447:
448:
459: public boolean remove(Object o) {
460: if (o == null) return false;
461: boolean removed = false;
462: fullyLock();
463: try {
464: Node<E> trail = head;
465: Node<E> p = head.next;
466: while (p != null) {
467: if (o.equals(p.item)) {
468: removed = true;
469: break;
470: }
471: trail = p;
472: p = p.next;
473: }
474: if (removed) {
475: p.item = null;
476: trail.next = p.next;
477: if (last == p)
478: last = trail;
479: if (count.getAndDecrement() == capacity)
480: notFull.signalAll();
481: }
482: } finally {
483: fullyUnlock();
484: }
485: return removed;
486: }
487:
488:
501: public Object[] toArray() {
502: fullyLock();
503: try {
504: int size = count.get();
505: Object[] a = new Object[size];
506: int k = 0;
507: for (Node<E> p = head.next; p != null; p = p.next)
508: a[k++] = p.item;
509: return a;
510: } finally {
511: fullyUnlock();
512: }
513: }
514:
515:
551: public <T> T[] toArray(T[] a) {
552: fullyLock();
553: try {
554: int size = count.get();
555: if (a.length < size)
556: a = (T[])java.lang.reflect.Array.newInstance
557: (a.getClass().getComponentType(), size);
558:
559: int k = 0;
560: for (Node p = head.next; p != null; p = p.next)
561: a[k++] = (T)p.item;
562: if (a.length > k)
563: a[k] = null;
564: return a;
565: } finally {
566: fullyUnlock();
567: }
568: }
569:
570: public String toString() {
571: fullyLock();
572: try {
573: return super.toString();
574: } finally {
575: fullyUnlock();
576: }
577: }
578:
579:
583: public void clear() {
584: fullyLock();
585: try {
586: head.next = null;
587: assert head.item == null;
588: last = head;
589: if (count.getAndSet(0) == capacity)
590: notFull.signalAll();
591: } finally {
592: fullyUnlock();
593: }
594: }
595:
596:
602: public int drainTo(Collection<? super E> c) {
603: if (c == null)
604: throw new NullPointerException();
605: if (c == this)
606: throw new IllegalArgumentException();
607: Node<E> first;
608: fullyLock();
609: try {
610: first = head.next;
611: head.next = null;
612: assert head.item == null;
613: last = head;
614: if (count.getAndSet(0) == capacity)
615: notFull.signalAll();
616: } finally {
617: fullyUnlock();
618: }
619:
620: int n = 0;
621: for (Node<E> p = first; p != null; p = p.next) {
622: c.add(p.item);
623: p.item = null;
624: ++n;
625: }
626: return n;
627: }
628:
629:
635: public int drainTo(Collection<? super E> c, int maxElements) {
636: if (c == null)
637: throw new NullPointerException();
638: if (c == this)
639: throw new IllegalArgumentException();
640: fullyLock();
641: try {
642: int n = 0;
643: Node<E> p = head.next;
644: while (p != null && n < maxElements) {
645: c.add(p.item);
646: p.item = null;
647: p = p.next;
648: ++n;
649: }
650: if (n != 0) {
651: head.next = p;
652: assert head.item == null;
653: if (p == null)
654: last = head;
655: if (count.getAndAdd(-n) == capacity)
656: notFull.signalAll();
657: }
658: return n;
659: } finally {
660: fullyUnlock();
661: }
662: }
663:
664:
674: public Iterator<E> iterator() {
675: return new Itr();
676: }
677:
678: private class Itr implements Iterator<E> {
679:
684: private Node<E> current;
685: private Node<E> lastRet;
686: private E currentElement;
687:
688: Itr() {
689: final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
690: final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
691: putLock.lock();
692: takeLock.lock();
693: try {
694: current = head.next;
695: if (current != null)
696: currentElement = current.item;
697: } finally {
698: takeLock.unlock();
699: putLock.unlock();
700: }
701: }
702:
703: public boolean hasNext() {
704: return current != null;
705: }
706:
707: public E next() {
708: final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
709: final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
710: putLock.lock();
711: takeLock.lock();
712: try {
713: if (current == null)
714: throw new NoSuchElementException();
715: E x = currentElement;
716: lastRet = current;
717: current = current.next;
718: if (current != null)
719: currentElement = current.item;
720: return x;
721: } finally {
722: takeLock.unlock();
723: putLock.unlock();
724: }
725: }
726:
727: public void remove() {
728: if (lastRet == null)
729: throw new IllegalStateException();
730: final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
731: final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
732: putLock.lock();
733: takeLock.lock();
734: try {
735: Node<E> node = lastRet;
736: lastRet = null;
737: Node<E> trail = head;
738: Node<E> p = head.next;
739: while (p != null && p != node) {
740: trail = p;
741: p = p.next;
742: }
743: if (p == node) {
744: p.item = null;
745: trail.next = p.next;
746: if (last == p)
747: last = trail;
748: int c = count.getAndDecrement();
749: if (c == capacity)
750: notFull.signalAll();
751: }
752: } finally {
753: takeLock.unlock();
754: putLock.unlock();
755: }
756: }
757: }
758:
759:
767: private void writeObject(java.io.ObjectOutputStream s)
768: throws java.io.IOException {
769:
770: fullyLock();
771: try {
772:
773: s.defaultWriteObject();
774:
775:
776: for (Node<E> p = head.next; p != null; p = p.next)
777: s.writeObject(p.item);
778:
779:
780: s.writeObject(null);
781: } finally {
782: fullyUnlock();
783: }
784: }
785:
786:
791: private void readObject(java.io.ObjectInputStream s)
792: throws java.io.IOException, ClassNotFoundException {
793:
794: s.defaultReadObject();
795:
796: count.set(0);
797: last = head = new Node<E>(null);
798:
799:
800: for (;;) {
801: E item = (E)s.readObject();
802: if (item == null)
803: break;
804: add(item);
805: }
806: }
807: }