001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Collection;
018    import java.util.concurrent.TimeUnit;
019    import java.util.concurrent.atomic.AtomicInteger;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class TaskQueue<E> {
027    
028            public TaskQueue() {
029                    this(Integer.MAX_VALUE);
030            }
031    
032            public TaskQueue(int capacity) {
033                    if (capacity <= 0) {
034                            throw new IllegalArgumentException();
035                    }
036    
037                    _capacity = capacity;
038                    _headNode = new Node<E>(null);
039                    _tailNode = _headNode;
040            }
041    
042            public int drainTo(Collection<E> collection) {
043                    if (collection == null) {
044                            throw new NullPointerException();
045                    }
046    
047                    _takeLock.lock();
048    
049                    try {
050                            Node<E> headNode = _headNode;
051    
052                            int size = _count.get();
053    
054                            int count = 0;
055    
056                            try {
057                                    while (count < size) {
058                                            Node<E> currentNode = headNode._nextNode;
059    
060                                            collection.add(currentNode._element);
061    
062                                            currentNode._element = null;
063    
064                                            headNode._nextNode = null;
065    
066                                            headNode = currentNode;
067    
068                                            count++;
069                                    }
070    
071                                    return count;
072                            }
073                            finally {
074                                    if (count > 0) {
075                                            _headNode = headNode;
076    
077                                            _count.getAndAdd(-count);
078                                    }
079                            }
080                    }
081                    finally {
082                            _takeLock.unlock();
083                    }
084            }
085    
086            public boolean isEmpty() {
087                    if (_count.get() == 0) {
088                            return true;
089                    }
090                    else {
091                            return false;
092                    }
093            }
094    
095            public boolean offer(E element, boolean[] hasWaiterMarker) {
096                    if ((element == null) || (hasWaiterMarker == null)) {
097                            throw new NullPointerException();
098                    }
099    
100                    if (hasWaiterMarker.length == 0) {
101                            throw new IllegalArgumentException();
102                    }
103    
104                    if (_count.get() == _capacity) {
105                            return false;
106                    }
107    
108                    int count = -1;
109    
110                    _putLock.lock();
111    
112                    try {
113                            if (_count.get() < _capacity) {
114                                    _enqueue(element);
115    
116                                    count = _count.getAndIncrement();
117    
118                                    _takeLock.lock();
119    
120                                    try {
121                                            hasWaiterMarker[0] = _takeLock.hasWaiters(
122                                                    _notEmptyCondition);
123    
124                                            if (!hasWaiterMarker[0] && (count >= _count.get())) {
125                                                    hasWaiterMarker[0] = true;
126                                            }
127                                    }
128                                    finally {
129                                            _takeLock.unlock();
130                                    }
131                            }
132                    }
133                    finally {
134                            _putLock.unlock();
135                    }
136    
137                    if (count == 0) {
138                            _takeLock.lock();
139    
140                            try {
141                                    _notEmptyCondition.signal();
142                            }
143                            finally {
144                                    _takeLock.unlock();
145                            }
146                    }
147    
148                    return count >= 0;
149            }
150    
151            public E poll() {
152                    if (_count.get() == 0) {
153                            return null;
154                    }
155    
156                    E element = null;
157    
158                    _takeLock.lock();
159    
160                    try {
161                            if (_count.get() > 0) {
162                                    element = _dequeue();
163    
164                                    if (_count.getAndDecrement() > 1) {
165                                            _notEmptyCondition.signal();
166                                    }
167                            }
168                    }
169                    finally {
170                            _takeLock.unlock();
171                    }
172    
173                    return element;
174            }
175    
176            public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
177                    E element = null;
178    
179                    long nanos = timeUnit.toNanos(timeout);
180    
181                    _takeLock.lockInterruptibly();
182    
183                    try {
184                            while (_count.get() == 0) {
185                                    if (nanos <= 0) {
186                                            return null;
187                                    }
188    
189                                    nanos = _notEmptyCondition.awaitNanos(nanos);
190                            }
191    
192                            element = _dequeue();
193    
194                            if (_count.getAndDecrement() > 1) {
195                                    _notEmptyCondition.signal();
196                            }
197                    }
198                    finally {
199                            _takeLock.unlock();
200                    }
201    
202                    return element;
203            }
204    
205            public int remainingCapacity() {
206                    return _capacity - _count.get();
207            }
208    
209            public boolean remove(E element) {
210                    if (element == null) {
211                            return false;
212                    }
213    
214                    _fullyLock();
215    
216                    try {
217                            Node<E> previousNode = _headNode;
218                            Node<E> currentNode = previousNode._nextNode;
219    
220                            while (currentNode != null) {
221                                    if (element.equals(currentNode._element)) {
222                                            _unlink(currentNode, previousNode);
223    
224                                            return true;
225                                    }
226    
227                                    previousNode = currentNode;
228                                    currentNode = currentNode._nextNode;
229                            }
230    
231                            return false;
232                    }
233                    finally {
234                            _fullyUnlock();
235                    }
236            }
237    
238            public int size() {
239                    return _count.get();
240            }
241    
242            public E take() throws InterruptedException {
243                    E element = null;
244    
245                    _takeLock.lockInterruptibly();
246    
247                    try {
248                            while (_count.get() == 0) {
249                                    _notEmptyCondition.await();
250                            }
251    
252                            element = _dequeue();
253    
254                            if (_count.getAndDecrement() > 1) {
255                                    _notEmptyCondition.signal();
256                            }
257                    }
258                    finally {
259                            _takeLock.unlock();
260                    }
261    
262                    return element;
263            }
264    
265            protected ReentrantLock getPutLock() {
266                    return _putLock;
267            }
268    
269            protected ReentrantLock getTakeLock() {
270                    return _takeLock;
271            }
272    
273            private E _dequeue() {
274                    Node<E> headNode = _headNode;
275                    Node<E> firstNode = headNode._nextNode;
276    
277                    headNode._nextNode = null;
278    
279                    _headNode = firstNode;
280    
281                    E element = firstNode._element;
282    
283                    firstNode._element = null;
284    
285                    return element;
286            }
287    
288            private void _enqueue(E element) {
289                    _tailNode._nextNode = new Node<E>(element);
290    
291                    _tailNode = _tailNode._nextNode;
292            }
293    
294            private void _fullyLock() {
295                    _putLock.lock();
296                    _takeLock.lock();
297            }
298    
299            private void _fullyUnlock() {
300                    _takeLock.unlock();
301                    _putLock.unlock();
302            }
303    
304            private void _unlink(Node<E> currentNode, Node<E> previousNode) {
305                    currentNode._element = null;
306                    previousNode._nextNode = currentNode._nextNode;
307    
308                    if (_tailNode == currentNode) {
309                            _tailNode = previousNode;
310                    }
311    
312                    _count.getAndDecrement();
313            }
314    
315            private final int _capacity;
316            private final AtomicInteger _count = new AtomicInteger();
317            private Node<E> _headNode;
318            private final ReentrantLock _putLock = new ReentrantLock();
319            private Node<E> _tailNode;
320            private final ReentrantLock _takeLock = new ReentrantLock();
321            private final Condition _notEmptyCondition = _takeLock.newCondition();
322    
323            private static class Node<E> {
324    
325                    private Node(E element) {
326                            _element = element;
327                    }
328    
329                    private E _element;
330                    private Node<E> _nextNode;
331    
332            }
333    
334    }