001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Collection;
018    import java.util.concurrent.TimeUnit;
019    import java.util.concurrent.atomic.AtomicInteger;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class TaskQueue<E> {
027    
028            public TaskQueue() {
029                    this(Integer.MAX_VALUE);
030            }
031    
032            public TaskQueue(int capacity) {
033                    if (capacity <= 0) {
034                            throw new IllegalArgumentException();
035                    }
036    
037                    _capacity = capacity;
038    
039                    _headNode = new Node<>(null);
040                    _tailNode = _headNode;
041    
042                    _notEmptyCondition = _takeLock.newCondition();
043            }
044    
045            public int drainTo(Collection<E> collection) {
046                    if (collection == null) {
047                            throw new NullPointerException();
048                    }
049    
050                    _takeLock.lock();
051    
052                    try {
053                            Node<E> headNode = _headNode;
054    
055                            int size = _count.get();
056    
057                            int count = 0;
058    
059                            try {
060                                    while (count < size) {
061                                            Node<E> currentNode = headNode._nextNode;
062    
063                                            collection.add(currentNode._element);
064    
065                                            currentNode._element = null;
066    
067                                            headNode._nextNode = null;
068    
069                                            headNode = currentNode;
070    
071                                            count++;
072                                    }
073    
074                                    return count;
075                            }
076                            finally {
077                                    if (count > 0) {
078                                            _headNode = headNode;
079    
080                                            _count.getAndAdd(-count);
081                                    }
082                            }
083                    }
084                    finally {
085                            _takeLock.unlock();
086                    }
087            }
088    
089            public boolean isEmpty() {
090                    if (_count.get() == 0) {
091                            return true;
092                    }
093                    else {
094                            return false;
095                    }
096            }
097    
098            public boolean offer(E element, boolean[] hasWaiterMarker) {
099                    if ((element == null) || (hasWaiterMarker == null)) {
100                            throw new NullPointerException();
101                    }
102    
103                    if (hasWaiterMarker.length == 0) {
104                            throw new IllegalArgumentException();
105                    }
106    
107                    if (_count.get() == _capacity) {
108                            return false;
109                    }
110    
111                    int count = -1;
112    
113                    _putLock.lock();
114    
115                    try {
116    
117                            // Take a snapshot of count before enqueue
118    
119                            count = _count.get();
120    
121                            if (count < _capacity) {
122                                    _enqueue(element);
123    
124                                    if (_count.getAndIncrement() == 0) {
125    
126                                            // Signal takers right after enqueue to increase the
127                                            // possibility of a concurrent token
128    
129                                            _takeLock.lock();
130    
131                                            try {
132                                                    _notEmptyCondition.signal();
133                                            }
134                                            finally {
135                                                    _takeLock.unlock();
136                                            }
137                                    }
138    
139                                    // After enqueue, a non-increasing count implies a concurrent
140                                    // token because there are spare threads
141    
142                                    if (count >= _count.get()) {
143                                            hasWaiterMarker[0] = true;
144                                    }
145                            }
146                    }
147                    finally {
148                            _putLock.unlock();
149                    }
150    
151                    if (count >= 0) {
152                            return true;
153                    }
154    
155                    return false;
156            }
157    
158            public E poll() {
159                    if (_count.get() == 0) {
160                            return null;
161                    }
162    
163                    E element = null;
164    
165                    _takeLock.lock();
166    
167                    try {
168                            if (_count.get() > 0) {
169                                    element = _dequeue();
170    
171                                    if (_count.getAndDecrement() > 1) {
172                                            _notEmptyCondition.signal();
173                                    }
174                            }
175                    }
176                    finally {
177                            _takeLock.unlock();
178                    }
179    
180                    return element;
181            }
182    
183            public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
184                    E element = null;
185    
186                    long nanos = timeUnit.toNanos(timeout);
187    
188                    _takeLock.lockInterruptibly();
189    
190                    try {
191                            while (_count.get() == 0) {
192                                    if (nanos <= 0) {
193                                            return null;
194                                    }
195    
196                                    nanos = _notEmptyCondition.awaitNanos(nanos);
197                            }
198    
199                            element = _dequeue();
200    
201                            if (_count.getAndDecrement() > 1) {
202                                    _notEmptyCondition.signal();
203                            }
204                    }
205                    finally {
206                            _takeLock.unlock();
207                    }
208    
209                    return element;
210            }
211    
212            public int remainingCapacity() {
213                    return _capacity - _count.get();
214            }
215    
216            public boolean remove(E element) {
217                    if (element == null) {
218                            return false;
219                    }
220    
221                    _fullyLock();
222    
223                    try {
224                            Node<E> previousNode = _headNode;
225                            Node<E> currentNode = previousNode._nextNode;
226    
227                            while (currentNode != null) {
228                                    if (element.equals(currentNode._element)) {
229                                            _unlink(currentNode, previousNode);
230    
231                                            return true;
232                                    }
233    
234                                    previousNode = currentNode;
235                                    currentNode = currentNode._nextNode;
236                            }
237    
238                            return false;
239                    }
240                    finally {
241                            _fullyUnlock();
242                    }
243            }
244    
245            public int size() {
246                    return _count.get();
247            }
248    
249            public E take() throws InterruptedException {
250                    E element = null;
251    
252                    _takeLock.lockInterruptibly();
253    
254                    try {
255                            while (_count.get() == 0) {
256                                    _notEmptyCondition.await();
257                            }
258    
259                            element = _dequeue();
260    
261                            if (_count.getAndDecrement() > 1) {
262                                    _notEmptyCondition.signal();
263                            }
264                    }
265                    finally {
266                            _takeLock.unlock();
267                    }
268    
269                    return element;
270            }
271    
272            protected ReentrantLock getPutLock() {
273                    return _putLock;
274            }
275    
276            protected ReentrantLock getTakeLock() {
277                    return _takeLock;
278            }
279    
280            private E _dequeue() {
281                    Node<E> headNode = _headNode;
282                    Node<E> firstNode = headNode._nextNode;
283    
284                    headNode._nextNode = null;
285    
286                    _headNode = firstNode;
287    
288                    E element = firstNode._element;
289    
290                    firstNode._element = null;
291    
292                    return element;
293            }
294    
295            private void _enqueue(E element) {
296                    _tailNode._nextNode = new Node<>(element);
297    
298                    _tailNode = _tailNode._nextNode;
299            }
300    
301            private void _fullyLock() {
302                    _putLock.lock();
303                    _takeLock.lock();
304            }
305    
306            private void _fullyUnlock() {
307                    _takeLock.unlock();
308                    _putLock.unlock();
309            }
310    
311            private void _unlink(Node<E> currentNode, Node<E> previousNode) {
312                    currentNode._element = null;
313                    previousNode._nextNode = currentNode._nextNode;
314    
315                    if (_tailNode == currentNode) {
316                            _tailNode = previousNode;
317                    }
318    
319                    _count.getAndDecrement();
320            }
321    
322            private final int _capacity;
323            private final AtomicInteger _count = new AtomicInteger();
324            private Node<E> _headNode;
325            private final Condition _notEmptyCondition;
326            private final ReentrantLock _putLock = new ReentrantLock();
327            private Node<E> _tailNode;
328            private final ReentrantLock _takeLock = new ReentrantLock(true);
329    
330            private static class Node<E> {
331    
332                    private Node(E element) {
333                            _element = element;
334                    }
335    
336                    private E _element;
337                    private Node<E> _nextNode;
338    
339            }
340    
341    }