001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Callable;
023 import java.util.concurrent.Executors;
024 import java.util.concurrent.ThreadFactory;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027 import java.util.concurrent.locks.Condition;
028 import java.util.concurrent.locks.ReentrantLock;
029
030
037 public class ThreadPoolExecutor extends AbstractExecutorService {
038
039 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
040 this(
041 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
042 Integer.MAX_VALUE, new AbortPolicy(),
043 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
044 }
045
046 public ThreadPoolExecutor(
047 int corePoolSize, int maxPoolSize, long keepAliveTime,
048 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
049
050 this(
051 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
052 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
053 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
054 }
055
056 public ThreadPoolExecutor(
057 int corePoolSize, int maxPoolSize, long keepAliveTime,
058 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
059 RejectedExecutionHandler rejectedExecutionHandler,
060 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
061
062 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
063 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
064 (maxQueueSize <= 0)) {
065
066 throw new IllegalArgumentException();
067 }
068
069 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
070 (threadPoolHandler == null)) {
071
072 throw new NullPointerException();
073 }
074
075 _corePoolSize = corePoolSize;
076 _maxPoolSize = maxPoolSize;
077 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
078 _allowCoreThreadTimeout = allowCoreThreadTimeout;
079 _rejectedExecutionHandler = rejectedExecutionHandler;
080 _threadFactory = threadFactory;
081 _threadPoolHandler = threadPoolHandler;
082 _taskQueue = new TaskQueue<>(maxQueueSize);
083 _workerTasks = new HashSet<>();
084 }
085
086 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
087 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
088 (newMaxPoolSize < newCorePoolSize)) {
089
090 throw new IllegalArgumentException();
091 }
092
093 _mainLock.lock();
094
095 try {
096 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
097 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
098
099 _corePoolSize = newCorePoolSize;
100 _maxPoolSize = newMaxPoolSize;
101
102 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
103 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
104
105 int interruptCount = Math.max(
106 surplusCoreThreads, surplusMaxPoolSize);
107
108 for (WorkerTask workerTask : _workerTasks) {
109 if (interruptCount > 0) {
110 if (workerTask._interruptIfWaiting()) {
111 interruptCount--;
112 }
113 }
114 else {
115 break;
116 }
117 }
118 }
119 else {
120 Runnable runnable = null;
121
122 while ((surplusCoreThreads++ < 0) &&
123 (_poolSize < _corePoolSize) &&
124 ((runnable = _taskQueue.poll()) != null)) {
125
126 _doAddWorkerThread(runnable);
127 }
128 }
129 }
130 finally {
131 _mainLock.unlock();
132 }
133 }
134
135 @Override
136 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
137 throws InterruptedException {
138
139 long nanos = timeUnit.toNanos(timeout);
140
141 _mainLock.lock();
142
143 try {
144 while (true) {
145 if (_runState == _TERMINATED) {
146 return true;
147 }
148
149 if (nanos <= 0) {
150 return false;
151 }
152
153 nanos = _terminationCondition.awaitNanos(nanos);
154 }
155 }
156 finally {
157 _mainLock.unlock();
158 }
159 }
160
161 @Override
162 public void execute(Runnable runnable) {
163 if (runnable == null) {
164 throw new NullPointerException();
165 }
166
167 boolean[] hasWaiterMarker = new boolean[1];
168
169 if ((_runState == _RUNNING) &&
170 _taskQueue.offer(runnable, hasWaiterMarker)) {
171
172 if (_runState != _RUNNING) {
173 if (_taskQueue.remove(runnable)) {
174 _rejectedExecutionHandler.rejectedExecution(runnable, this);
175 }
176
177 return;
178 }
179
180 if (!hasWaiterMarker[0]) {
181 _addWorkerThread();
182 }
183
184 return;
185 }
186
187 _rejectedExecutionHandler.rejectedExecution(runnable, this);
188 }
189
190 public int getActiveCount() {
191 _mainLock.lock();
192
193 try {
194 int count = 0;
195
196 for (WorkerTask workerTask : _workerTasks) {
197 if (workerTask._isLocked()) {
198 count++;
199 }
200 }
201
202 return count;
203 }
204 finally {
205 _mainLock.unlock();
206 }
207 }
208
209 public long getCompletedTaskCount() {
210 _mainLock.lock();
211
212 try {
213 long count = _completedTaskCount;
214
215 for (WorkerTask workerTask : _workerTasks) {
216 count += workerTask._localCompletedTaskCount;
217 }
218
219 return count;
220 }
221 finally {
222 _mainLock.unlock();
223 }
224 }
225
226 public int getCorePoolSize() {
227 return _corePoolSize;
228 }
229
230 public long getKeepAliveTime(TimeUnit timeUnit) {
231 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
232 }
233
234 public int getLargestPoolSize() {
235 return _largestPoolSize;
236 }
237
238 public int getMaxPoolSize() {
239 return _maxPoolSize;
240 }
241
242 public String getName() {
243 return _name;
244 }
245
246 public int getPendingTaskCount() {
247 return _taskQueue.size();
248 }
249
250 public int getPoolSize() {
251 return _poolSize;
252 }
253
254 public RejectedExecutionHandler getRejectedExecutionHandler() {
255 return _rejectedExecutionHandler;
256 }
257
258 public int getRemainingTaskQueueCapacity() {
259 return _taskQueue.remainingCapacity();
260 }
261
262 public long getTaskCount() {
263 _mainLock.lock();
264
265 try {
266 long count = _completedTaskCount;
267
268 for (WorkerTask workerTask : _workerTasks) {
269 count += workerTask._localCompletedTaskCount;
270
271 if (workerTask._isLocked()) {
272 count++;
273 }
274 }
275
276 return count + _taskQueue.size();
277 }
278 finally {
279 _mainLock.unlock();
280 }
281 }
282
283 public ThreadFactory getThreadFactory() {
284 return _threadFactory;
285 }
286
287 public ThreadPoolHandler getThreadPoolHandler() {
288 return _threadPoolHandler;
289 }
290
291 public boolean isAllowCoreThreadTimeout() {
292 return _allowCoreThreadTimeout;
293 }
294
295 @Override
296 public boolean isShutdown() {
297 if (_runState != _RUNNING) {
298 return true;
299 }
300 else {
301 return false;
302 }
303 }
304
305 @Override
306 public boolean isTerminated() {
307 if (_runState == _TERMINATED) {
308 return true;
309 }
310 else {
311 return false;
312 }
313 }
314
315 public boolean isTerminating() {
316 if (_runState == _STOP) {
317 return true;
318 }
319 else {
320 return false;
321 }
322 }
323
324 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
325 _allowCoreThreadTimeout = allowCoreThreadTimeout;
326 }
327
328 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
329 if (keepAliveTime < 0) {
330 throw new IllegalArgumentException();
331 }
332
333 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
334 }
335
336 public void setName(String name) {
337 _name = name;
338 }
339
340 public void setRejectedExecutionHandler(
341 RejectedExecutionHandler rejectedExecutionHandler) {
342
343 if (rejectedExecutionHandler == null) {
344 throw new NullPointerException();
345 }
346
347 _rejectedExecutionHandler = rejectedExecutionHandler;
348 }
349
350 public void setThreadFactory(ThreadFactory threadFactory) {
351 if (threadFactory == null) {
352 throw new NullPointerException();
353 }
354
355 _threadFactory = threadFactory;
356 }
357
358 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
359 if (threadPoolHandler == null) {
360 throw new NullPointerException();
361 }
362
363 _threadPoolHandler = threadPoolHandler;
364 }
365
366 @Override
367 public void shutdown() {
368 _mainLock.lock();
369
370 try {
371 int state = _runState;
372
373 if (state < _SHUTDOWN) {
374 _runState = _SHUTDOWN;
375 }
376
377 for (WorkerTask workerTask : _workerTasks) {
378 workerTask._interruptIfWaiting();
379 }
380
381 _tryTerminate();
382 }
383 finally {
384 _mainLock.unlock();
385 }
386 }
387
388 @Override
389 public List<Runnable> shutdownNow() {
390 _mainLock.lock();
391
392 try {
393 int state = _runState;
394
395 if (state < _STOP) {
396 _runState = _STOP;
397 }
398
399 for (WorkerTask workerTask : _workerTasks) {
400 workerTask._thread.interrupt();
401 }
402
403 List<Runnable> runnables = new ArrayList<>();
404
405 _taskQueue.drainTo(runnables);
406
407 _tryTerminate();
408
409 return runnables;
410 }
411 finally {
412 _mainLock.unlock();
413 }
414 }
415
416 @Override
417 public <T> NoticeableFuture<T> submit(Callable<T> callable) {
418 if (callable == null) {
419 throw new NullPointerException("Callable is null");
420 }
421
422 DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
423 callable);
424
425 execute(defaultNoticeableFuture);
426
427 return defaultNoticeableFuture;
428 }
429
430 @Override
431 public NoticeableFuture<?> submit(Runnable runnable) {
432 return submit(runnable, null);
433 }
434
435 @Override
436 public <T> NoticeableFuture<T> submit(Runnable runnable, T result) {
437 if (runnable == null) {
438 throw new NullPointerException("Runnable is null");
439 }
440
441 DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
442 runnable, result);
443
444 execute(defaultNoticeableFuture);
445
446 return defaultNoticeableFuture;
447 }
448
449 public NoticeableFuture<Void> terminationNoticeableFuture() {
450 return _terminationDefaultNoticeableFuture;
451 }
452
453 @Override
454 protected void finalize() {
455 shutdown();
456 }
457
458 protected ReentrantLock getMainLock() {
459 return _mainLock;
460 }
461
462 protected TaskQueue<Runnable> getTaskQueue() {
463 return _taskQueue;
464 }
465
466 protected Set<WorkerTask> getWorkerTasks() {
467 return _workerTasks;
468 }
469
470 @Override
471 protected <T> DefaultNoticeableFuture<T> newTaskFor(Callable<T> callable) {
472 return new DefaultNoticeableFuture<>(callable);
473 }
474
475 @Override
476 protected <T> DefaultNoticeableFuture<T> newTaskFor(
477 Runnable runnable, T value) {
478
479 return new DefaultNoticeableFuture<>(runnable, value);
480 }
481
482 private void _addWorkerThread() {
483 int runState = _runState;
484 int poolSize = _poolSize;
485
486 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
487 ((runState == _SHUTDOWN) && (poolSize == 0) &&
488 !_taskQueue.isEmpty())) {
489
490 _mainLock.lock();
491
492 try {
493 runState = _runState;
494 poolSize = _poolSize;
495
496 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
497 ((runState == _SHUTDOWN) && (poolSize == 0) &&
498 !_taskQueue.isEmpty())) {
499
500 Runnable runnable = _taskQueue.poll();
501
502 if (runnable != null) {
503 _doAddWorkerThread(runnable);
504 }
505 }
506 }
507 finally {
508 _mainLock.unlock();
509 }
510 }
511 }
512
513 private void _doAddWorkerThread(Runnable runnable) {
514 WorkerTask workerTask = new WorkerTask(runnable);
515
516 _workerTasks.add(workerTask);
517
518 int poolSize = ++_poolSize;
519
520 if (poolSize > _largestPoolSize) {
521 _largestPoolSize = poolSize;
522 }
523
524 workerTask._startWork();
525 }
526
527 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
528 while (true) {
529 try {
530 int state = _runState;
531
532 if (state >= _STOP) {
533 return null;
534 }
535
536 Runnable runnable = null;
537
538 if (state == _SHUTDOWN) {
539 runnable = _taskQueue.poll();
540 }
541 else if ((_poolSize > _corePoolSize) ||
542 _allowCoreThreadTimeout) {
543
544 runnable = _taskQueue.poll(
545 _keepAliveTime, TimeUnit.NANOSECONDS);
546 }
547 else {
548 runnable = _taskQueue.take();
549 }
550
551 if (runnable != null) {
552 return runnable;
553 }
554
555 _mainLock.lock();
556
557 try {
558 if ((_runState >= _STOP) ||
559 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
560 (_allowCoreThreadTimeout &&
561 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
562 (!_allowCoreThreadTimeout &&
563 (_poolSize > _corePoolSize))) {
564
565 _completedTaskCount +=
566 workerTask._localCompletedTaskCount;
567
568 _workerTasks.remove(workerTask);
569
570 if (--_poolSize == 0) {
571 _tryTerminate();
572 }
573
574 cleanUpMarker[0] = true;
575
576 return null;
577 }
578 }
579 finally {
580 _mainLock.unlock();
581 }
582 }
583 catch (InterruptedException ie) {
584 }
585 }
586 }
587
588 private void _tryTerminate() {
589 if (_poolSize == 0) {
590 int state = _runState;
591
592 if ((state == _STOP) ||
593 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
594
595 _runState = _TERMINATED;
596
597 _terminationCondition.signalAll();
598
599 _threadPoolHandler.terminated();
600
601 _terminationDefaultNoticeableFuture.run();
602
603 return;
604 }
605
606 if (!_taskQueue.isEmpty()) {
607 _doAddWorkerThread(_taskQueue.poll());
608 }
609 }
610 }
611
612 private static final int _RUNNING = 0;
613
614 private static final int _SHUTDOWN = 1;
615
616 private static final int _STOP = 2;
617
618 private static final int _TERMINATED = 3;
619
620 private volatile boolean _allowCoreThreadTimeout;
621 private long _completedTaskCount;
622 private volatile int _corePoolSize;
623 private volatile long _keepAliveTime;
624 private volatile int _largestPoolSize;
625 private final ReentrantLock _mainLock = new ReentrantLock();
626 private volatile int _maxPoolSize;
627 private String _name;
628 private volatile int _poolSize;
629 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
630 private volatile int _runState;
631 private final TaskQueue<Runnable> _taskQueue;
632 private final Condition _terminationCondition = _mainLock.newCondition();
633
634 private final DefaultNoticeableFuture<Void>
635 _terminationDefaultNoticeableFuture =
636 new DefaultNoticeableFuture<Void>() {
637
638 @Override
639 public boolean cancel(boolean mayInterruptIfRunning) {
640 return false;
641 }
642
643 };
644
645 private volatile ThreadFactory _threadFactory;
646 private volatile ThreadPoolHandler _threadPoolHandler;
647 private final Set<WorkerTask> _workerTasks;
648
649 private class WorkerTask
650 extends AbstractQueuedSynchronizer implements Runnable {
651
652 public WorkerTask(Runnable runnable) {
653 _runnable = runnable;
654 }
655
656 @Override
657 public void run() {
658 boolean[] cleanUpMarker = new boolean[1];
659
660 try {
661 Runnable runnable = _runnable;
662
663 _runnable = null;
664
665 do {
666 if (runnable != null) {
667 _runTask(runnable);
668
669 runnable = null;
670 }
671 }
672 while ((runnable = _getTask(this, cleanUpMarker)) != null);
673 }
674 finally {
675 if (!cleanUpMarker[0]) {
676 _mainLock.lock();
677
678 try {
679 _completedTaskCount += _localCompletedTaskCount;
680
681 _workerTasks.remove(this);
682
683 if (--_poolSize == 0) {
684 _tryTerminate();
685 }
686 }
687 finally {
688 _mainLock.unlock();
689 }
690 }
691
692 _threadPoolHandler.beforeThreadEnd(_thread);
693 }
694 }
695
696 @Override
697 protected boolean isHeldExclusively() {
698 if (getState() == 1) {
699 return true;
700 }
701 else {
702 return false;
703 }
704 }
705
706 @Override
707 protected boolean tryAcquire(int unused) {
708 return compareAndSetState(0, 1);
709 }
710
711 @Override
712 protected boolean tryRelease(int unused) {
713 setState(0);
714
715 return true;
716 }
717
718 private boolean _interruptIfWaiting() {
719 if (!_thread.isInterrupted() && tryAcquire(1)) {
720 try {
721 _thread.interrupt();
722
723 return true;
724 }
725 finally {
726 _unlock();
727 }
728 }
729
730 return false;
731 }
732
733 private boolean _isLocked() {
734 return isHeldExclusively();
735 }
736
737 private void _lock() {
738 acquire(1);
739 }
740
741 private void _runTask(Runnable task) {
742 _lock();
743
744 try {
745 if ((_runState < _STOP) && Thread.interrupted() &&
746 (_runState >= _STOP)) {
747
748 _thread.interrupt();
749 }
750
751 Throwable throwable = null;
752
753 _threadPoolHandler.beforeExecute(_thread, task);
754
755 try {
756 task.run();
757
758 _localCompletedTaskCount++;
759 }
760 catch (RuntimeException re) {
761 throwable = re;
762
763 throw re;
764 }
765 finally {
766 _threadPoolHandler.afterExecute(task, throwable);
767 }
768 }
769 finally {
770 _unlock();
771 }
772 }
773
774 private void _startWork() {
775 _thread = _threadFactory.newThread(this);
776
777 _threadPoolHandler.beforeThreadStart(_thread);
778
779 _thread.start();
780 }
781
782 private void _unlock() {
783 release(1);
784 }
785
786 private volatile long _localCompletedTaskCount;
787 private Runnable _runnable;
788 private Thread _thread;
789
790 }
791
792 }