001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Collection;
018    import java.util.concurrent.TimeUnit;
019    import java.util.concurrent.atomic.AtomicInteger;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class TaskQueue<E> {
027    
028            public TaskQueue() {
029                    this(Integer.MAX_VALUE);
030            }
031    
032            public TaskQueue(int capacity) {
033                    if (capacity <= 0) {
034                            throw new IllegalArgumentException();
035                    }
036    
037                    _capacity = capacity;
038    
039                    _headNode = new Node<E>(null);
040                    _tailNode = _headNode;
041    
042                    _notEmptyCondition = _takeLock.newCondition();
043            }
044    
045            public int drainTo(Collection<E> collection) {
046                    if (collection == null) {
047                            throw new NullPointerException();
048                    }
049    
050                    _takeLock.lock();
051    
052                    try {
053                            Node<E> headNode = _headNode;
054    
055                            int size = _count.get();
056    
057                            int count = 0;
058    
059                            try {
060                                    while (count < size) {
061                                            Node<E> currentNode = headNode._nextNode;
062    
063                                            collection.add(currentNode._element);
064    
065                                            currentNode._element = null;
066    
067                                            headNode._nextNode = null;
068    
069                                            headNode = currentNode;
070    
071                                            count++;
072                                    }
073    
074                                    return count;
075                            }
076                            finally {
077                                    if (count > 0) {
078                                            _headNode = headNode;
079    
080                                            _count.getAndAdd(-count);
081                                    }
082                            }
083                    }
084                    finally {
085                            _takeLock.unlock();
086                    }
087            }
088    
089            public boolean isEmpty() {
090                    if (_count.get() == 0) {
091                            return true;
092                    }
093                    else {
094                            return false;
095                    }
096            }
097    
098            public boolean offer(E element, boolean[] hasWaiterMarker) {
099                    if ((element == null) || (hasWaiterMarker == null)) {
100                            throw new NullPointerException();
101                    }
102    
103                    if (hasWaiterMarker.length == 0) {
104                            throw new IllegalArgumentException();
105                    }
106    
107                    if (_count.get() == _capacity) {
108                            return false;
109                    }
110    
111                    int count = -1;
112    
113                    _putLock.lock();
114    
115                    try {
116    
117                            // Take a snapshot of count before enqueue
118    
119                            count = _count.get();
120    
121                            if (count < _capacity) {
122                                    _enqueue(element);
123    
124                                    _count.getAndIncrement();
125    
126                                    if (count == 0) {
127    
128                                            // Signal takers right after enqueue to increase the
129                                            // possibility of a concurrent token
130    
131                                            _takeLock.lock();
132    
133                                            try {
134                                                    _notEmptyCondition.signal();
135                                            }
136                                            finally {
137                                                    _takeLock.unlock();
138                                            }
139                                    }
140    
141                                    // After enqueue, a non-increasing count implies a concurrent
142                                    // token because there are spare threads
143    
144                                    if (count >= _count.get()) {
145                                            hasWaiterMarker[0] = true;
146                                    }
147                            }
148                    }
149                    finally {
150                            _putLock.unlock();
151                    }
152    
153                    return count >= 0;
154            }
155    
156            public E poll() {
157                    if (_count.get() == 0) {
158                            return null;
159                    }
160    
161                    E element = null;
162    
163                    _takeLock.lock();
164    
165                    try {
166                            if (_count.get() > 0) {
167                                    element = _dequeue();
168    
169                                    if (_count.getAndDecrement() > 1) {
170                                            _notEmptyCondition.signal();
171                                    }
172                            }
173                    }
174                    finally {
175                            _takeLock.unlock();
176                    }
177    
178                    return element;
179            }
180    
181            public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
182                    E element = null;
183    
184                    long nanos = timeUnit.toNanos(timeout);
185    
186                    _takeLock.lockInterruptibly();
187    
188                    try {
189                            while (_count.get() == 0) {
190                                    if (nanos <= 0) {
191                                            return null;
192                                    }
193    
194                                    nanos = _notEmptyCondition.awaitNanos(nanos);
195                            }
196    
197                            element = _dequeue();
198    
199                            if (_count.getAndDecrement() > 1) {
200                                    _notEmptyCondition.signal();
201                            }
202                    }
203                    finally {
204                            _takeLock.unlock();
205                    }
206    
207                    return element;
208            }
209    
210            public int remainingCapacity() {
211                    return _capacity - _count.get();
212            }
213    
214            public boolean remove(E element) {
215                    if (element == null) {
216                            return false;
217                    }
218    
219                    _fullyLock();
220    
221                    try {
222                            Node<E> previousNode = _headNode;
223                            Node<E> currentNode = previousNode._nextNode;
224    
225                            while (currentNode != null) {
226                                    if (element.equals(currentNode._element)) {
227                                            _unlink(currentNode, previousNode);
228    
229                                            return true;
230                                    }
231    
232                                    previousNode = currentNode;
233                                    currentNode = currentNode._nextNode;
234                            }
235    
236                            return false;
237                    }
238                    finally {
239                            _fullyUnlock();
240                    }
241            }
242    
243            public int size() {
244                    return _count.get();
245            }
246    
247            public E take() throws InterruptedException {
248                    E element = null;
249    
250                    _takeLock.lockInterruptibly();
251    
252                    try {
253                            while (_count.get() == 0) {
254                                    _notEmptyCondition.await();
255                            }
256    
257                            element = _dequeue();
258    
259                            if (_count.getAndDecrement() > 1) {
260                                    _notEmptyCondition.signal();
261                            }
262                    }
263                    finally {
264                            _takeLock.unlock();
265                    }
266    
267                    return element;
268            }
269    
270            protected ReentrantLock getPutLock() {
271                    return _putLock;
272            }
273    
274            protected ReentrantLock getTakeLock() {
275                    return _takeLock;
276            }
277    
278            private E _dequeue() {
279                    Node<E> headNode = _headNode;
280                    Node<E> firstNode = headNode._nextNode;
281    
282                    headNode._nextNode = null;
283    
284                    _headNode = firstNode;
285    
286                    E element = firstNode._element;
287    
288                    firstNode._element = null;
289    
290                    return element;
291            }
292    
293            private void _enqueue(E element) {
294                    _tailNode._nextNode = new Node<E>(element);
295    
296                    _tailNode = _tailNode._nextNode;
297            }
298    
299            private void _fullyLock() {
300                    _putLock.lock();
301                    _takeLock.lock();
302            }
303    
304            private void _fullyUnlock() {
305                    _takeLock.unlock();
306                    _putLock.unlock();
307            }
308    
309            private void _unlink(Node<E> currentNode, Node<E> previousNode) {
310                    currentNode._element = null;
311                    previousNode._nextNode = currentNode._nextNode;
312    
313                    if (_tailNode == currentNode) {
314                            _tailNode = previousNode;
315                    }
316    
317                    _count.getAndDecrement();
318            }
319    
320            private final int _capacity;
321            private final AtomicInteger _count = new AtomicInteger();
322            private Node<E> _headNode;
323            private final Condition _notEmptyCondition;
324            private final ReentrantLock _putLock = new ReentrantLock();
325            private Node<E> _tailNode;
326            private final ReentrantLock _takeLock = new ReentrantLock(true);
327    
328            private static class Node<E> {
329    
330                    private Node(E element) {
331                            _element = element;
332                    }
333    
334                    private E _element;
335                    private Node<E> _nextNode;
336    
337            }
338    
339    }