001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Collection;
018 import java.util.concurrent.TimeUnit;
019 import java.util.concurrent.atomic.AtomicInteger;
020 import java.util.concurrent.locks.Condition;
021 import java.util.concurrent.locks.ReentrantLock;
022
023
026 public class TaskQueue<E> {
027
028 public TaskQueue() {
029 this(Integer.MAX_VALUE);
030 }
031
032 public TaskQueue(int capacity) {
033 if (capacity <= 0) {
034 throw new IllegalArgumentException();
035 }
036
037 _capacity = capacity;
038 _headNode = new Node<E>(null);
039 _tailNode = _headNode;
040 }
041
042 public int drainTo(Collection<E> collection) {
043 if (collection == null) {
044 throw new NullPointerException();
045 }
046
047 _takeLock.lock();
048
049 try {
050 Node<E> headNode = _headNode;
051
052 int size = _count.get();
053
054 int count = 0;
055
056 try {
057 while (count < size) {
058 Node<E> currentNode = headNode._nextNode;
059
060 collection.add(currentNode._element);
061
062 currentNode._element = null;
063
064 headNode._nextNode = null;
065
066 headNode = currentNode;
067
068 count++;
069 }
070
071 return count;
072 }
073 finally {
074 if (count > 0) {
075 _headNode = headNode;
076
077 _count.getAndAdd(-count);
078 }
079 }
080 }
081 finally {
082 _takeLock.unlock();
083 }
084 }
085
086 public boolean isEmpty() {
087 if (_count.get() == 0) {
088 return true;
089 }
090 else {
091 return false;
092 }
093 }
094
095 public boolean offer(E element, boolean[] hasWaiterMarker) {
096 if ((element == null) || (hasWaiterMarker == null)) {
097 throw new NullPointerException();
098 }
099
100 if (hasWaiterMarker.length == 0) {
101 throw new IllegalArgumentException();
102 }
103
104 if (_count.get() == _capacity) {
105 return false;
106 }
107
108 int count = -1;
109
110 _putLock.lock();
111
112 try {
113 if (_count.get() < _capacity) {
114 _enqueue(element);
115
116 count = _count.getAndIncrement();
117
118 _takeLock.lock();
119
120 try {
121 hasWaiterMarker[0] = _takeLock.hasWaiters(
122 _notEmptyCondition);
123
124 if (!hasWaiterMarker[0] && (count >= _count.get())) {
125 hasWaiterMarker[0] = true;
126 }
127 }
128 finally {
129 _takeLock.unlock();
130 }
131 }
132 }
133 finally {
134 _putLock.unlock();
135 }
136
137 if (count == 0) {
138 _takeLock.lock();
139
140 try {
141 _notEmptyCondition.signal();
142 }
143 finally {
144 _takeLock.unlock();
145 }
146 }
147
148 return count >= 0;
149 }
150
151 public E poll() {
152 if (_count.get() == 0) {
153 return null;
154 }
155
156 E element = null;
157
158 _takeLock.lock();
159
160 try {
161 if (_count.get() > 0) {
162 element = _dequeue();
163
164 if (_count.getAndDecrement() > 1) {
165 _notEmptyCondition.signal();
166 }
167 }
168 }
169 finally {
170 _takeLock.unlock();
171 }
172
173 return element;
174 }
175
176 public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
177 E element = null;
178
179 long nanos = timeUnit.toNanos(timeout);
180
181 _takeLock.lockInterruptibly();
182
183 try {
184 while (_count.get() == 0) {
185 if (nanos <= 0) {
186 return null;
187 }
188
189 nanos = _notEmptyCondition.awaitNanos(nanos);
190 }
191
192 element = _dequeue();
193
194 if (_count.getAndDecrement() > 1) {
195 _notEmptyCondition.signal();
196 }
197 }
198 finally {
199 _takeLock.unlock();
200 }
201
202 return element;
203 }
204
205 public int remainingCapacity() {
206 return _capacity - _count.get();
207 }
208
209 public boolean remove(E element) {
210 if (element == null) {
211 return false;
212 }
213
214 _fullyLock();
215
216 try {
217 Node<E> previousNode = _headNode;
218 Node<E> currentNode = previousNode._nextNode;
219
220 while (currentNode != null) {
221 if (element.equals(currentNode._element)) {
222 _unlink(currentNode, previousNode);
223
224 return true;
225 }
226
227 previousNode = currentNode;
228 currentNode = currentNode._nextNode;
229 }
230
231 return false;
232 }
233 finally {
234 _fullyUnlock();
235 }
236 }
237
238 public int size() {
239 return _count.get();
240 }
241
242 public E take() throws InterruptedException {
243 E element = null;
244
245 _takeLock.lockInterruptibly();
246
247 try {
248 while (_count.get() == 0) {
249 _notEmptyCondition.await();
250 }
251
252 element = _dequeue();
253
254 if (_count.getAndDecrement() > 1) {
255 _notEmptyCondition.signal();
256 }
257 }
258 finally {
259 _takeLock.unlock();
260 }
261
262 return element;
263 }
264
265 protected ReentrantLock getPutLock() {
266 return _putLock;
267 }
268
269 protected ReentrantLock getTakeLock() {
270 return _takeLock;
271 }
272
273 private E _dequeue() {
274 Node<E> headNode = _headNode;
275 Node<E> firstNode = headNode._nextNode;
276
277 headNode._nextNode = null;
278
279 _headNode = firstNode;
280
281 E element = firstNode._element;
282
283 firstNode._element = null;
284
285 return element;
286 }
287
288 private void _enqueue(E element) {
289 _tailNode._nextNode = new Node<E>(element);
290
291 _tailNode = _tailNode._nextNode;
292 }
293
294 private void _fullyLock() {
295 _putLock.lock();
296 _takeLock.lock();
297 }
298
299 private void _fullyUnlock() {
300 _takeLock.unlock();
301 _putLock.unlock();
302 }
303
304 private void _unlink(Node<E> currentNode, Node<E> previousNode) {
305 currentNode._element = null;
306 previousNode._nextNode = currentNode._nextNode;
307
308 if (_tailNode == currentNode) {
309 _tailNode = previousNode;
310 }
311
312 _count.getAndDecrement();
313 }
314
315 private final int _capacity;
316 private final AtomicInteger _count = new AtomicInteger();
317 private Node<E> _headNode;
318 private final ReentrantLock _putLock = new ReentrantLock();
319 private Node<E> _tailNode;
320 private final ReentrantLock _takeLock = new ReentrantLock();
321 private final Condition _notEmptyCondition = _takeLock.newCondition();
322
323 private static class Node<E> {
324
325 private Node(E element) {
326 _element = element;
327 }
328
329 private E _element;
330 private Node<E> _nextNode;
331
332 }
333
334 }