001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Collection;
018 import java.util.concurrent.TimeUnit;
019 import java.util.concurrent.atomic.AtomicInteger;
020 import java.util.concurrent.locks.Condition;
021 import java.util.concurrent.locks.ReentrantLock;
022
023
026 public class TaskQueue<E> {
027
028 public TaskQueue() {
029 this(Integer.MAX_VALUE);
030 }
031
032 public TaskQueue(int capacity) {
033 if (capacity <= 0) {
034 throw new IllegalArgumentException();
035 }
036
037 _capacity = capacity;
038
039 _headNode = new Node<E>(null);
040 _tailNode = _headNode;
041
042 _notEmptyCondition = _takeLock.newCondition();
043 }
044
045 public int drainTo(Collection<E> collection) {
046 if (collection == null) {
047 throw new NullPointerException();
048 }
049
050 _takeLock.lock();
051
052 try {
053 Node<E> headNode = _headNode;
054
055 int size = _count.get();
056
057 int count = 0;
058
059 try {
060 while (count < size) {
061 Node<E> currentNode = headNode._nextNode;
062
063 collection.add(currentNode._element);
064
065 currentNode._element = null;
066
067 headNode._nextNode = null;
068
069 headNode = currentNode;
070
071 count++;
072 }
073
074 return count;
075 }
076 finally {
077 if (count > 0) {
078 _headNode = headNode;
079
080 _count.getAndAdd(-count);
081 }
082 }
083 }
084 finally {
085 _takeLock.unlock();
086 }
087 }
088
089 public boolean isEmpty() {
090 if (_count.get() == 0) {
091 return true;
092 }
093 else {
094 return false;
095 }
096 }
097
098 public boolean offer(E element, boolean[] hasWaiterMarker) {
099 if ((element == null) || (hasWaiterMarker == null)) {
100 throw new NullPointerException();
101 }
102
103 if (hasWaiterMarker.length == 0) {
104 throw new IllegalArgumentException();
105 }
106
107 if (_count.get() == _capacity) {
108 return false;
109 }
110
111 int count = -1;
112
113 _putLock.lock();
114
115 try {
116
117
118
119 count = _count.get();
120
121 if (count < _capacity) {
122 _enqueue(element);
123
124 _count.getAndIncrement();
125
126 if (count == 0) {
127
128
129
130
131 _takeLock.lock();
132
133 try {
134 _notEmptyCondition.signal();
135 }
136 finally {
137 _takeLock.unlock();
138 }
139 }
140
141
142
143
144 if (count >= _count.get()) {
145 hasWaiterMarker[0] = true;
146 }
147 }
148 }
149 finally {
150 _putLock.unlock();
151 }
152
153 return count >= 0;
154 }
155
156 public E poll() {
157 if (_count.get() == 0) {
158 return null;
159 }
160
161 E element = null;
162
163 _takeLock.lock();
164
165 try {
166 if (_count.get() > 0) {
167 element = _dequeue();
168
169 if (_count.getAndDecrement() > 1) {
170 _notEmptyCondition.signal();
171 }
172 }
173 }
174 finally {
175 _takeLock.unlock();
176 }
177
178 return element;
179 }
180
181 public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
182 E element = null;
183
184 long nanos = timeUnit.toNanos(timeout);
185
186 _takeLock.lockInterruptibly();
187
188 try {
189 while (_count.get() == 0) {
190 if (nanos <= 0) {
191 return null;
192 }
193
194 nanos = _notEmptyCondition.awaitNanos(nanos);
195 }
196
197 element = _dequeue();
198
199 if (_count.getAndDecrement() > 1) {
200 _notEmptyCondition.signal();
201 }
202 }
203 finally {
204 _takeLock.unlock();
205 }
206
207 return element;
208 }
209
210 public int remainingCapacity() {
211 return _capacity - _count.get();
212 }
213
214 public boolean remove(E element) {
215 if (element == null) {
216 return false;
217 }
218
219 _fullyLock();
220
221 try {
222 Node<E> previousNode = _headNode;
223 Node<E> currentNode = previousNode._nextNode;
224
225 while (currentNode != null) {
226 if (element.equals(currentNode._element)) {
227 _unlink(currentNode, previousNode);
228
229 return true;
230 }
231
232 previousNode = currentNode;
233 currentNode = currentNode._nextNode;
234 }
235
236 return false;
237 }
238 finally {
239 _fullyUnlock();
240 }
241 }
242
243 public int size() {
244 return _count.get();
245 }
246
247 public E take() throws InterruptedException {
248 E element = null;
249
250 _takeLock.lockInterruptibly();
251
252 try {
253 while (_count.get() == 0) {
254 _notEmptyCondition.await();
255 }
256
257 element = _dequeue();
258
259 if (_count.getAndDecrement() > 1) {
260 _notEmptyCondition.signal();
261 }
262 }
263 finally {
264 _takeLock.unlock();
265 }
266
267 return element;
268 }
269
270 protected ReentrantLock getPutLock() {
271 return _putLock;
272 }
273
274 protected ReentrantLock getTakeLock() {
275 return _takeLock;
276 }
277
278 private E _dequeue() {
279 Node<E> headNode = _headNode;
280 Node<E> firstNode = headNode._nextNode;
281
282 headNode._nextNode = null;
283
284 _headNode = firstNode;
285
286 E element = firstNode._element;
287
288 firstNode._element = null;
289
290 return element;
291 }
292
293 private void _enqueue(E element) {
294 _tailNode._nextNode = new Node<E>(element);
295
296 _tailNode = _tailNode._nextNode;
297 }
298
299 private void _fullyLock() {
300 _putLock.lock();
301 _takeLock.lock();
302 }
303
304 private void _fullyUnlock() {
305 _takeLock.unlock();
306 _putLock.unlock();
307 }
308
309 private void _unlink(Node<E> currentNode, Node<E> previousNode) {
310 currentNode._element = null;
311 previousNode._nextNode = currentNode._nextNode;
312
313 if (_tailNode == currentNode) {
314 _tailNode = previousNode;
315 }
316
317 _count.getAndDecrement();
318 }
319
320 private final int _capacity;
321 private final AtomicInteger _count = new AtomicInteger();
322 private Node<E> _headNode;
323 private final Condition _notEmptyCondition;
324 private final ReentrantLock _putLock = new ReentrantLock();
325 private Node<E> _tailNode;
326 private final ReentrantLock _takeLock = new ReentrantLock(true);
327
328 private static class Node<E> {
329
330 private Node(E element) {
331 _element = element;
332 }
333
334 private E _element;
335 private Node<E> _nextNode;
336
337 }
338
339 }