001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Executors;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
036 public class ThreadPoolExecutor extends AbstractExecutorService {
037
038 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039 this(
040 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041 Integer.MAX_VALUE, new AbortPolicy(),
042 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043 }
044
045 public ThreadPoolExecutor(
046 int corePoolSize, int maxPoolSize, long keepAliveTime,
047 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048
049 this(
050 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053 }
054
055 public ThreadPoolExecutor(
056 int corePoolSize, int maxPoolSize, long keepAliveTime,
057 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058 RejectedExecutionHandler rejectedExecutionHandler,
059 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060
061 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063 (maxQueueSize <= 0)) {
064
065 throw new IllegalArgumentException();
066 }
067
068 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069 (threadPoolHandler == null)) {
070
071 throw new NullPointerException();
072 }
073
074 _corePoolSize = corePoolSize;
075 _maxPoolSize = maxPoolSize;
076 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077 _allowCoreThreadTimeout = allowCoreThreadTimeout;
078 _rejectedExecutionHandler = rejectedExecutionHandler;
079 _threadFactory = threadFactory;
080 _threadPoolHandler = threadPoolHandler;
081 _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082 _workerTasks = new HashSet<WorkerTask>();
083 }
084
085 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087 (newMaxPoolSize < newCorePoolSize)) {
088
089 throw new IllegalArgumentException();
090 }
091
092 _mainLock.lock();
093
094 try {
095 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097
098 _corePoolSize = newCorePoolSize;
099 _maxPoolSize = newMaxPoolSize;
100
101 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103
104 int interruptCount = Math.max(
105 surplusCoreThreads, surplusMaxPoolSize);
106
107 for (WorkerTask workerTask : _workerTasks) {
108 if (interruptCount > 0) {
109 if (workerTask._interruptIfWaiting()) {
110 interruptCount--;
111 }
112 }
113 else {
114 break;
115 }
116 }
117 }
118 else {
119 Runnable runnable = null;
120
121 while ((surplusCoreThreads++ < 0) &&
122 (_poolSize < _corePoolSize) &&
123 ((runnable = _taskQueue.poll()) != null)) {
124
125 _doAddWorkerThread(runnable);
126 }
127 }
128 }
129 finally {
130 _mainLock.unlock();
131 }
132 }
133
134 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
135 throws InterruptedException {
136
137 long nanos = timeUnit.toNanos(timeout);
138
139 _mainLock.lock();
140
141 try {
142 while (true) {
143 if (_runState == _TERMINATED) {
144 return true;
145 }
146
147 if (nanos <= 0) {
148 return false;
149 }
150
151 nanos = _terminationCondition.awaitNanos(nanos);
152 }
153 }
154 finally {
155 _mainLock.unlock();
156 }
157 }
158
159 public void execute(Runnable runnable) {
160 if (runnable == null) {
161 throw new NullPointerException();
162 }
163
164 boolean[] hasWaiterMarker = new boolean[1];
165
166 if ((_runState == _RUNNING) &&
167 _taskQueue.offer(runnable, hasWaiterMarker)) {
168
169 if (_runState != _RUNNING) {
170 if (_taskQueue.remove(runnable)) {
171 _rejectedExecutionHandler.rejectedExecution(runnable, this);
172 }
173
174 return;
175 }
176
177 if (!hasWaiterMarker[0]) {
178 _addWorkerThread();
179 }
180
181 return;
182 }
183
184 _rejectedExecutionHandler.rejectedExecution(runnable, this);
185 }
186
187 public int getActiveCount() {
188 _mainLock.lock();
189
190 try {
191 int count = 0;
192
193 for (WorkerTask workerTask : _workerTasks) {
194 if (workerTask._isLocked()) {
195 count++;
196 }
197 }
198
199 return count;
200 }
201 finally {
202 _mainLock.unlock();
203 }
204 }
205
206 public long getCompletedTaskCount() {
207 _mainLock.lock();
208
209 try {
210 long count = _completedTaskCount;
211
212 for (WorkerTask workerTask : _workerTasks) {
213 count += workerTask._localCompletedTaskCount;
214 }
215
216 return count;
217 }
218 finally {
219 _mainLock.unlock();
220 }
221 }
222
223 public int getCorePoolSize() {
224 return _corePoolSize;
225 }
226
227 public long getKeepAliveTime(TimeUnit timeUnit) {
228 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
229 }
230
231 public int getLargestPoolSize() {
232 return _largestPoolSize;
233 }
234
235 public int getMaxPoolSize() {
236 return _maxPoolSize;
237 }
238
239 public int getPendingTaskCount() {
240 return _taskQueue.size();
241 }
242
243 public int getPoolSize() {
244 return _poolSize;
245 }
246
247 public RejectedExecutionHandler getRejectedExecutionHandler() {
248 return _rejectedExecutionHandler;
249 }
250
251 public int getRemainingTaskQueueCapacity() {
252 return _taskQueue.remainingCapacity();
253 }
254
255 public long getTaskCount() {
256 _mainLock.lock();
257
258 try {
259 long count = _completedTaskCount;
260
261 for (WorkerTask workerTask : _workerTasks) {
262 count += workerTask._localCompletedTaskCount;
263
264 if (workerTask._isLocked()) {
265 count++;
266 }
267 }
268
269 return count + _taskQueue.size();
270 }
271 finally {
272 _mainLock.unlock();
273 }
274 }
275
276 public ThreadFactory getThreadFactory() {
277 return _threadFactory;
278 }
279
280 public ThreadPoolHandler getThreadPoolHandler() {
281 return _threadPoolHandler;
282 }
283
284 public boolean isAllowCoreThreadTimeout() {
285 return _allowCoreThreadTimeout;
286 }
287
288 public boolean isShutdown() {
289 if (_runState != _RUNNING) {
290 return true;
291 }
292 else {
293 return false;
294 }
295 }
296
297 public boolean isTerminated() {
298 if (_runState == _TERMINATED) {
299 return true;
300 }
301 else {
302 return false;
303 }
304 }
305
306 public boolean isTerminating() {
307 if (_runState == _STOP) {
308 return true;
309 }
310 else {
311 return false;
312 }
313 }
314
315 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
316 _allowCoreThreadTimeout = allowCoreThreadTimeout;
317 }
318
319 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
320 if (keepAliveTime < 0) {
321 throw new IllegalArgumentException();
322 }
323
324 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
325 }
326
327 public void setRejectedExecutionHandler(
328 RejectedExecutionHandler rejectedExecutionHandler) {
329
330 if (rejectedExecutionHandler == null) {
331 throw new NullPointerException();
332 }
333
334 _rejectedExecutionHandler = rejectedExecutionHandler;
335 }
336
337 public void setThreadFactory(ThreadFactory threadFactory) {
338 if (threadFactory == null) {
339 throw new NullPointerException();
340 }
341
342 _threadFactory = threadFactory;
343 }
344
345 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
346 if (threadPoolHandler == null) {
347 throw new NullPointerException();
348 }
349
350 _threadPoolHandler = threadPoolHandler;
351 }
352
353 public void shutdown() {
354 _mainLock.lock();
355
356 try {
357 int state = _runState;
358
359 if (state < _SHUTDOWN) {
360 _runState = _SHUTDOWN;
361 }
362
363 for (WorkerTask workerTask : _workerTasks) {
364 workerTask._interruptIfWaiting();
365 }
366
367 _tryTerminate();
368 }
369 finally {
370 _mainLock.unlock();
371 }
372 }
373
374 public List<Runnable> shutdownNow() {
375 _mainLock.lock();
376
377 try {
378 int state = _runState;
379
380 if (state < _STOP) {
381 _runState = _STOP;
382 }
383
384 for (WorkerTask workerTask : _workerTasks) {
385 workerTask._thread.interrupt();
386 }
387
388 List<Runnable> runnables = new ArrayList<Runnable>();
389
390 _taskQueue.drainTo(runnables);
391
392 _tryTerminate();
393
394 return runnables;
395 }
396 finally {
397 _mainLock.unlock();
398 }
399 }
400
401 @Override
402 protected void finalize() {
403 shutdown();
404 }
405
406 protected ReentrantLock getMainLock() {
407 return _mainLock;
408 }
409
410 protected TaskQueue<Runnable> getTaskQueue() {
411 return _taskQueue;
412 }
413
414 protected Set<WorkerTask> getWorkerTasks() {
415 return _workerTasks;
416 }
417
418 private void _addWorkerThread() {
419 int runState = _runState;
420 int poolSize = _poolSize;
421
422 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
423 ((runState == _SHUTDOWN) && (poolSize == 0) &&
424 !_taskQueue.isEmpty())) {
425
426 _mainLock.lock();
427
428 try {
429 runState = _runState;
430 poolSize = _poolSize;
431
432 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
433 ((runState == _SHUTDOWN) && (poolSize == 0) &&
434 !_taskQueue.isEmpty())) {
435
436 Runnable runnable = _taskQueue.poll();
437
438 if (runnable != null) {
439 _doAddWorkerThread(runnable);
440 }
441 }
442 }
443 finally {
444 _mainLock.unlock();
445 }
446 }
447 }
448
449 private void _doAddWorkerThread(Runnable runnable) {
450 WorkerTask workerTask = new WorkerTask(runnable);
451
452 _workerTasks.add(workerTask);
453
454 int poolSize = ++_poolSize;
455
456 if (poolSize > _largestPoolSize) {
457 _largestPoolSize = poolSize;
458 }
459
460 workerTask._startWork();
461 }
462
463 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
464 while (true) {
465 try {
466 int state = _runState;
467
468 if (state >= _STOP) {
469 return null;
470 }
471
472 Runnable runnable = null;
473
474 if (state == _SHUTDOWN) {
475 runnable = _taskQueue.poll();
476 }
477 else if ((_poolSize > _corePoolSize) ||
478 _allowCoreThreadTimeout) {
479
480 runnable = _taskQueue.poll(
481 _keepAliveTime, TimeUnit.NANOSECONDS);
482 }
483 else {
484 runnable = _taskQueue.take();
485 }
486
487 if (runnable != null) {
488 return runnable;
489 }
490
491 _mainLock.lock();
492
493 try {
494 if ((_runState >= _STOP) ||
495 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
496 (_allowCoreThreadTimeout &&
497 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
498 (!_allowCoreThreadTimeout &&
499 (_poolSize > _corePoolSize))) {
500
501 _completedTaskCount +=
502 workerTask._localCompletedTaskCount;
503
504 _workerTasks.remove(workerTask);
505
506 if (--_poolSize == 0) {
507 _tryTerminate();
508 }
509
510 cleanUpMarker[0] = true;
511
512 return null;
513 }
514 }
515 finally {
516 _mainLock.unlock();
517 }
518 }
519 catch (InterruptedException ie) {
520 }
521 }
522 }
523
524 private void _tryTerminate() {
525 if (_poolSize == 0) {
526 int state = _runState;
527
528 if ((state == _STOP) ||
529 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
530
531 _runState = _TERMINATED;
532
533 _terminationCondition.signalAll();
534 _threadPoolHandler.terminated();
535
536 return;
537 }
538
539 if (!_taskQueue.isEmpty()) {
540 _doAddWorkerThread(_taskQueue.poll());
541 }
542 }
543 }
544
545 private static final int _RUNNING = 0;
546
547 private static final int _SHUTDOWN = 1;
548
549 private static final int _STOP = 2;
550
551 private static final int _TERMINATED = 3;
552
553 private volatile boolean _allowCoreThreadTimeout;
554 private long _completedTaskCount;
555 private volatile int _corePoolSize;
556 private volatile long _keepAliveTime;
557 private volatile int _largestPoolSize;
558 private final ReentrantLock _mainLock = new ReentrantLock();
559 private volatile int _maxPoolSize;
560 private volatile int _poolSize;
561 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
562 private volatile int _runState;
563 private final TaskQueue<Runnable> _taskQueue;
564 private final Condition _terminationCondition = _mainLock.newCondition();
565 private volatile ThreadFactory _threadFactory;
566 private volatile ThreadPoolHandler _threadPoolHandler;
567 private final Set<WorkerTask> _workerTasks;
568
569 private class WorkerTask
570 extends AbstractQueuedSynchronizer implements Runnable {
571
572 public WorkerTask(Runnable runnable) {
573 _runnable = runnable;
574 }
575
576 public void run() {
577 boolean[] cleanUpMarker = new boolean[1];
578
579 try {
580 Runnable runnable = _runnable;
581
582 _runnable = null;
583
584 do {
585 if (runnable != null) {
586 _runTask(runnable);
587
588 runnable = null;
589 }
590 }
591 while ((runnable = _getTask(this, cleanUpMarker)) != null);
592 }
593 finally {
594 if (!cleanUpMarker[0]) {
595 _mainLock.lock();
596
597 try {
598 _completedTaskCount += _localCompletedTaskCount;
599
600 _workerTasks.remove(this);
601
602 if (--_poolSize == 0) {
603 _tryTerminate();
604 }
605 }
606 finally {
607 _mainLock.unlock();
608 }
609 }
610
611 _threadPoolHandler.beforeThreadEnd(_thread);
612 }
613 }
614
615 @Override
616 protected boolean isHeldExclusively() {
617 if (getState() == 1) {
618 return true;
619 }
620 else {
621 return false;
622 }
623 }
624
625 @Override
626 protected boolean tryAcquire(int unused) {
627 return compareAndSetState(0, 1);
628 }
629
630 @Override
631 protected boolean tryRelease(int unused) {
632 setState(0);
633
634 return true;
635 }
636
637 private boolean _interruptIfWaiting() {
638 if (!_thread.isInterrupted() && tryAcquire(1)) {
639 try {
640 _thread.interrupt();
641
642 return true;
643 }
644 finally {
645 _unlock();
646 }
647 }
648
649 return false;
650 }
651
652 private boolean _isLocked() {
653 return isHeldExclusively();
654 }
655
656 private void _lock() {
657 acquire(1);
658 }
659
660 private void _runTask(Runnable task) {
661 _lock();
662
663 try {
664 if ((_runState < _STOP) && Thread.interrupted() &&
665 (_runState >= _STOP)) {
666
667 _thread.interrupt();
668 }
669
670 Throwable throwable = null;
671
672 _threadPoolHandler.beforeExecute(_thread, task);
673
674 try {
675 task.run();
676
677 _localCompletedTaskCount++;
678 }
679 catch (RuntimeException re) {
680 throwable = re;
681
682 throw re;
683 }
684 finally {
685 _threadPoolHandler.afterExecute(task, throwable);
686 }
687 }
688 finally {
689 _unlock();
690 }
691 }
692
693 private void _startWork() {
694 _thread = _threadFactory.newThread(this);
695
696 _threadPoolHandler.beforeThreadStart(_thread);
697
698 _thread.start();
699 }
700
701 private void _unlock() {
702 release(1);
703 }
704
705 private volatile long _localCompletedTaskCount;
706 private Runnable _runnable;
707 private Thread _thread;
708
709 }
710
711 }