001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Collection;
018 import java.util.concurrent.TimeUnit;
019 import java.util.concurrent.atomic.AtomicInteger;
020 import java.util.concurrent.locks.Condition;
021 import java.util.concurrent.locks.ReentrantLock;
022
023
026 public class TaskQueue<E> {
027
028 public TaskQueue() {
029 this(Integer.MAX_VALUE);
030 }
031
032 public TaskQueue(int capacity) {
033 if (capacity <= 0) {
034 throw new IllegalArgumentException();
035 }
036
037 _capacity = capacity;
038
039 _headNode = new Node<>(null);
040 _tailNode = _headNode;
041
042 _notEmptyCondition = _takeLock.newCondition();
043 }
044
045 public int drainTo(Collection<E> collection) {
046 if (collection == null) {
047 throw new NullPointerException();
048 }
049
050 _takeLock.lock();
051
052 try {
053 Node<E> headNode = _headNode;
054
055 int size = _count.get();
056
057 int count = 0;
058
059 try {
060 while (count < size) {
061 Node<E> currentNode = headNode._nextNode;
062
063 collection.add(currentNode._element);
064
065 currentNode._element = null;
066
067 headNode._nextNode = null;
068
069 headNode = currentNode;
070
071 count++;
072 }
073
074 return count;
075 }
076 finally {
077 if (count > 0) {
078 _headNode = headNode;
079
080 _count.getAndAdd(-count);
081 }
082 }
083 }
084 finally {
085 _takeLock.unlock();
086 }
087 }
088
089 public boolean isEmpty() {
090 if (_count.get() == 0) {
091 return true;
092 }
093 else {
094 return false;
095 }
096 }
097
098 public boolean offer(E element, boolean[] hasWaiterMarker) {
099 if ((element == null) || (hasWaiterMarker == null)) {
100 throw new NullPointerException();
101 }
102
103 if (hasWaiterMarker.length == 0) {
104 throw new IllegalArgumentException();
105 }
106
107 if (_count.get() == _capacity) {
108 return false;
109 }
110
111 int count = -1;
112
113 _putLock.lock();
114
115 try {
116
117
118
119 count = _count.get();
120
121 if (count < _capacity) {
122 _enqueue(element);
123
124 if (_count.getAndIncrement() == 0) {
125
126
127
128
129 _takeLock.lock();
130
131 try {
132 _notEmptyCondition.signal();
133 }
134 finally {
135 _takeLock.unlock();
136 }
137 }
138
139
140
141
142 if (count >= _count.get()) {
143 hasWaiterMarker[0] = true;
144 }
145 }
146 }
147 finally {
148 _putLock.unlock();
149 }
150
151 if (count >= 0) {
152 return true;
153 }
154
155 return false;
156 }
157
158 public E poll() {
159 if (_count.get() == 0) {
160 return null;
161 }
162
163 E element = null;
164
165 _takeLock.lock();
166
167 try {
168 if (_count.get() > 0) {
169 element = _dequeue();
170
171 if (_count.getAndDecrement() > 1) {
172 _notEmptyCondition.signal();
173 }
174 }
175 }
176 finally {
177 _takeLock.unlock();
178 }
179
180 return element;
181 }
182
183 public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
184 E element = null;
185
186 long nanos = timeUnit.toNanos(timeout);
187
188 _takeLock.lockInterruptibly();
189
190 try {
191 while (_count.get() == 0) {
192 if (nanos <= 0) {
193 return null;
194 }
195
196 nanos = _notEmptyCondition.awaitNanos(nanos);
197 }
198
199 element = _dequeue();
200
201 if (_count.getAndDecrement() > 1) {
202 _notEmptyCondition.signal();
203 }
204 }
205 finally {
206 _takeLock.unlock();
207 }
208
209 return element;
210 }
211
212 public int remainingCapacity() {
213 return _capacity - _count.get();
214 }
215
216 public boolean remove(E element) {
217 if (element == null) {
218 return false;
219 }
220
221 _fullyLock();
222
223 try {
224 Node<E> previousNode = _headNode;
225 Node<E> currentNode = previousNode._nextNode;
226
227 while (currentNode != null) {
228 if (element.equals(currentNode._element)) {
229 _unlink(currentNode, previousNode);
230
231 return true;
232 }
233
234 previousNode = currentNode;
235 currentNode = currentNode._nextNode;
236 }
237
238 return false;
239 }
240 finally {
241 _fullyUnlock();
242 }
243 }
244
245 public int size() {
246 return _count.get();
247 }
248
249 public E take() throws InterruptedException {
250 E element = null;
251
252 _takeLock.lockInterruptibly();
253
254 try {
255 while (_count.get() == 0) {
256 _notEmptyCondition.await();
257 }
258
259 element = _dequeue();
260
261 if (_count.getAndDecrement() > 1) {
262 _notEmptyCondition.signal();
263 }
264 }
265 finally {
266 _takeLock.unlock();
267 }
268
269 return element;
270 }
271
272 protected ReentrantLock getPutLock() {
273 return _putLock;
274 }
275
276 protected ReentrantLock getTakeLock() {
277 return _takeLock;
278 }
279
280 private E _dequeue() {
281 Node<E> headNode = _headNode;
282 Node<E> firstNode = headNode._nextNode;
283
284 headNode._nextNode = null;
285
286 _headNode = firstNode;
287
288 E element = firstNode._element;
289
290 firstNode._element = null;
291
292 return element;
293 }
294
295 private void _enqueue(E element) {
296 _tailNode._nextNode = new Node<>(element);
297
298 _tailNode = _tailNode._nextNode;
299 }
300
301 private void _fullyLock() {
302 _putLock.lock();
303 _takeLock.lock();
304 }
305
306 private void _fullyUnlock() {
307 _takeLock.unlock();
308 _putLock.unlock();
309 }
310
311 private void _unlink(Node<E> currentNode, Node<E> previousNode) {
312 currentNode._element = null;
313 previousNode._nextNode = currentNode._nextNode;
314
315 if (_tailNode == currentNode) {
316 _tailNode = previousNode;
317 }
318
319 _count.getAndDecrement();
320 }
321
322 private final int _capacity;
323 private final AtomicInteger _count = new AtomicInteger();
324 private Node<E> _headNode;
325 private final Condition _notEmptyCondition;
326 private final ReentrantLock _putLock = new ReentrantLock();
327 private Node<E> _tailNode;
328 private final ReentrantLock _takeLock = new ReentrantLock(true);
329
330 private static class Node<E> {
331
332 private Node(E element) {
333 _element = element;
334 }
335
336 private E _element;
337 private Node<E> _nextNode;
338
339 }
340
341 }