001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Callable;
023 import java.util.concurrent.Executors;
024 import java.util.concurrent.ThreadFactory;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027 import java.util.concurrent.locks.Condition;
028 import java.util.concurrent.locks.ReentrantLock;
029
030
037 public class ThreadPoolExecutor extends AbstractExecutorService {
038
039 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
040 this(
041 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
042 Integer.MAX_VALUE, new AbortPolicy(),
043 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
044 }
045
046 public ThreadPoolExecutor(
047 int corePoolSize, int maxPoolSize, long keepAliveTime,
048 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
049
050 this(
051 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
052 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
053 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
054 }
055
056 public ThreadPoolExecutor(
057 int corePoolSize, int maxPoolSize, long keepAliveTime,
058 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
059 RejectedExecutionHandler rejectedExecutionHandler,
060 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
061
062 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
063 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
064 (maxQueueSize <= 0)) {
065
066 throw new IllegalArgumentException();
067 }
068
069 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
070 (threadPoolHandler == null)) {
071
072 throw new NullPointerException();
073 }
074
075 _corePoolSize = corePoolSize;
076 _maxPoolSize = maxPoolSize;
077 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
078 _allowCoreThreadTimeout = allowCoreThreadTimeout;
079 _rejectedExecutionHandler = rejectedExecutionHandler;
080 _threadFactory = threadFactory;
081 _threadPoolHandler = threadPoolHandler;
082 _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
083 _workerTasks = new HashSet<WorkerTask>();
084 }
085
086 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
087 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
088 (newMaxPoolSize < newCorePoolSize)) {
089
090 throw new IllegalArgumentException();
091 }
092
093 _mainLock.lock();
094
095 try {
096 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
097 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
098
099 _corePoolSize = newCorePoolSize;
100 _maxPoolSize = newMaxPoolSize;
101
102 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
103 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
104
105 int interruptCount = Math.max(
106 surplusCoreThreads, surplusMaxPoolSize);
107
108 for (WorkerTask workerTask : _workerTasks) {
109 if (interruptCount > 0) {
110 if (workerTask._interruptIfWaiting()) {
111 interruptCount--;
112 }
113 }
114 else {
115 break;
116 }
117 }
118 }
119 else {
120 Runnable runnable = null;
121
122 while ((surplusCoreThreads++ < 0) &&
123 (_poolSize < _corePoolSize) &&
124 ((runnable = _taskQueue.poll()) != null)) {
125
126 _doAddWorkerThread(runnable);
127 }
128 }
129 }
130 finally {
131 _mainLock.unlock();
132 }
133 }
134
135 @Override
136 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
137 throws InterruptedException {
138
139 long nanos = timeUnit.toNanos(timeout);
140
141 _mainLock.lock();
142
143 try {
144 while (true) {
145 if (_runState == _TERMINATED) {
146 return true;
147 }
148
149 if (nanos <= 0) {
150 return false;
151 }
152
153 nanos = _terminationCondition.awaitNanos(nanos);
154 }
155 }
156 finally {
157 _mainLock.unlock();
158 }
159 }
160
161 @Override
162 public void execute(Runnable runnable) {
163 if (runnable == null) {
164 throw new NullPointerException();
165 }
166
167 boolean[] hasWaiterMarker = new boolean[1];
168
169 if ((_runState == _RUNNING) &&
170 _taskQueue.offer(runnable, hasWaiterMarker)) {
171
172 if (_runState != _RUNNING) {
173 if (_taskQueue.remove(runnable)) {
174 _rejectedExecutionHandler.rejectedExecution(runnable, this);
175 }
176
177 return;
178 }
179
180 if (!hasWaiterMarker[0]) {
181 _addWorkerThread();
182 }
183
184 return;
185 }
186
187 _rejectedExecutionHandler.rejectedExecution(runnable, this);
188 }
189
190 public int getActiveCount() {
191 _mainLock.lock();
192
193 try {
194 int count = 0;
195
196 for (WorkerTask workerTask : _workerTasks) {
197 if (workerTask._isLocked()) {
198 count++;
199 }
200 }
201
202 return count;
203 }
204 finally {
205 _mainLock.unlock();
206 }
207 }
208
209 public long getCompletedTaskCount() {
210 _mainLock.lock();
211
212 try {
213 long count = _completedTaskCount;
214
215 for (WorkerTask workerTask : _workerTasks) {
216 count += workerTask._localCompletedTaskCount;
217 }
218
219 return count;
220 }
221 finally {
222 _mainLock.unlock();
223 }
224 }
225
226 public int getCorePoolSize() {
227 return _corePoolSize;
228 }
229
230 public long getKeepAliveTime(TimeUnit timeUnit) {
231 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
232 }
233
234 public int getLargestPoolSize() {
235 return _largestPoolSize;
236 }
237
238 public int getMaxPoolSize() {
239 return _maxPoolSize;
240 }
241
242 public int getPendingTaskCount() {
243 return _taskQueue.size();
244 }
245
246 public int getPoolSize() {
247 return _poolSize;
248 }
249
250 public RejectedExecutionHandler getRejectedExecutionHandler() {
251 return _rejectedExecutionHandler;
252 }
253
254 public int getRemainingTaskQueueCapacity() {
255 return _taskQueue.remainingCapacity();
256 }
257
258 public long getTaskCount() {
259 _mainLock.lock();
260
261 try {
262 long count = _completedTaskCount;
263
264 for (WorkerTask workerTask : _workerTasks) {
265 count += workerTask._localCompletedTaskCount;
266
267 if (workerTask._isLocked()) {
268 count++;
269 }
270 }
271
272 return count + _taskQueue.size();
273 }
274 finally {
275 _mainLock.unlock();
276 }
277 }
278
279 public ThreadFactory getThreadFactory() {
280 return _threadFactory;
281 }
282
283 public ThreadPoolHandler getThreadPoolHandler() {
284 return _threadPoolHandler;
285 }
286
287 public boolean isAllowCoreThreadTimeout() {
288 return _allowCoreThreadTimeout;
289 }
290
291 @Override
292 public boolean isShutdown() {
293 if (_runState != _RUNNING) {
294 return true;
295 }
296 else {
297 return false;
298 }
299 }
300
301 @Override
302 public boolean isTerminated() {
303 if (_runState == _TERMINATED) {
304 return true;
305 }
306 else {
307 return false;
308 }
309 }
310
311 public boolean isTerminating() {
312 if (_runState == _STOP) {
313 return true;
314 }
315 else {
316 return false;
317 }
318 }
319
320 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
321 _allowCoreThreadTimeout = allowCoreThreadTimeout;
322 }
323
324 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
325 if (keepAliveTime < 0) {
326 throw new IllegalArgumentException();
327 }
328
329 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
330 }
331
332 public void setRejectedExecutionHandler(
333 RejectedExecutionHandler rejectedExecutionHandler) {
334
335 if (rejectedExecutionHandler == null) {
336 throw new NullPointerException();
337 }
338
339 _rejectedExecutionHandler = rejectedExecutionHandler;
340 }
341
342 public void setThreadFactory(ThreadFactory threadFactory) {
343 if (threadFactory == null) {
344 throw new NullPointerException();
345 }
346
347 _threadFactory = threadFactory;
348 }
349
350 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
351 if (threadPoolHandler == null) {
352 throw new NullPointerException();
353 }
354
355 _threadPoolHandler = threadPoolHandler;
356 }
357
358 @Override
359 public void shutdown() {
360 _mainLock.lock();
361
362 try {
363 int state = _runState;
364
365 if (state < _SHUTDOWN) {
366 _runState = _SHUTDOWN;
367 }
368
369 for (WorkerTask workerTask : _workerTasks) {
370 workerTask._interruptIfWaiting();
371 }
372
373 _tryTerminate();
374 }
375 finally {
376 _mainLock.unlock();
377 }
378 }
379
380 @Override
381 public List<Runnable> shutdownNow() {
382 _mainLock.lock();
383
384 try {
385 int state = _runState;
386
387 if (state < _STOP) {
388 _runState = _STOP;
389 }
390
391 for (WorkerTask workerTask : _workerTasks) {
392 workerTask._thread.interrupt();
393 }
394
395 List<Runnable> runnables = new ArrayList<Runnable>();
396
397 _taskQueue.drainTo(runnables);
398
399 _tryTerminate();
400
401 return runnables;
402 }
403 finally {
404 _mainLock.unlock();
405 }
406 }
407
408 @Override
409 public <T> NoticeableFuture<T> submit(Callable<T> callable) {
410 if (callable == null) {
411 throw new NullPointerException("Callable is null");
412 }
413
414 DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
415 callable);
416
417 execute(defaultNoticeableFuture);
418
419 return defaultNoticeableFuture;
420 }
421
422 @Override
423 public NoticeableFuture<?> submit(Runnable runnable) {
424 return submit(runnable, null);
425 }
426
427 @Override
428 public <T> NoticeableFuture<T> submit(Runnable runnable, T result) {
429 if (runnable == null) {
430 throw new NullPointerException("Runnable is null");
431 }
432
433 DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
434 runnable, result);
435
436 execute(defaultNoticeableFuture);
437
438 return defaultNoticeableFuture;
439 }
440
441 @Override
442 protected void finalize() {
443 shutdown();
444 }
445
446 protected ReentrantLock getMainLock() {
447 return _mainLock;
448 }
449
450 protected TaskQueue<Runnable> getTaskQueue() {
451 return _taskQueue;
452 }
453
454 protected Set<WorkerTask> getWorkerTasks() {
455 return _workerTasks;
456 }
457
458 @Override
459 protected <T> DefaultNoticeableFuture<T> newTaskFor(Callable<T> callable) {
460 return new DefaultNoticeableFuture<T>(callable);
461 }
462
463 @Override
464 protected <T> DefaultNoticeableFuture<T> newTaskFor(
465 Runnable runnable, T value) {
466
467 return new DefaultNoticeableFuture<T>(runnable, value);
468 }
469
470 private void _addWorkerThread() {
471 int runState = _runState;
472 int poolSize = _poolSize;
473
474 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
475 ((runState == _SHUTDOWN) && (poolSize == 0) &&
476 !_taskQueue.isEmpty())) {
477
478 _mainLock.lock();
479
480 try {
481 runState = _runState;
482 poolSize = _poolSize;
483
484 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
485 ((runState == _SHUTDOWN) && (poolSize == 0) &&
486 !_taskQueue.isEmpty())) {
487
488 Runnable runnable = _taskQueue.poll();
489
490 if (runnable != null) {
491 _doAddWorkerThread(runnable);
492 }
493 }
494 }
495 finally {
496 _mainLock.unlock();
497 }
498 }
499 }
500
501 private void _doAddWorkerThread(Runnable runnable) {
502 WorkerTask workerTask = new WorkerTask(runnable);
503
504 _workerTasks.add(workerTask);
505
506 int poolSize = ++_poolSize;
507
508 if (poolSize > _largestPoolSize) {
509 _largestPoolSize = poolSize;
510 }
511
512 workerTask._startWork();
513 }
514
515 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
516 while (true) {
517 try {
518 int state = _runState;
519
520 if (state >= _STOP) {
521 return null;
522 }
523
524 Runnable runnable = null;
525
526 if (state == _SHUTDOWN) {
527 runnable = _taskQueue.poll();
528 }
529 else if ((_poolSize > _corePoolSize) ||
530 _allowCoreThreadTimeout) {
531
532 runnable = _taskQueue.poll(
533 _keepAliveTime, TimeUnit.NANOSECONDS);
534 }
535 else {
536 runnable = _taskQueue.take();
537 }
538
539 if (runnable != null) {
540 return runnable;
541 }
542
543 _mainLock.lock();
544
545 try {
546 if ((_runState >= _STOP) ||
547 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
548 (_allowCoreThreadTimeout &&
549 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
550 (!_allowCoreThreadTimeout &&
551 (_poolSize > _corePoolSize))) {
552
553 _completedTaskCount +=
554 workerTask._localCompletedTaskCount;
555
556 _workerTasks.remove(workerTask);
557
558 if (--_poolSize == 0) {
559 _tryTerminate();
560 }
561
562 cleanUpMarker[0] = true;
563
564 return null;
565 }
566 }
567 finally {
568 _mainLock.unlock();
569 }
570 }
571 catch (InterruptedException ie) {
572 }
573 }
574 }
575
576 private void _tryTerminate() {
577 if (_poolSize == 0) {
578 int state = _runState;
579
580 if ((state == _STOP) ||
581 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
582
583 _runState = _TERMINATED;
584
585 _terminationCondition.signalAll();
586 _threadPoolHandler.terminated();
587
588 return;
589 }
590
591 if (!_taskQueue.isEmpty()) {
592 _doAddWorkerThread(_taskQueue.poll());
593 }
594 }
595 }
596
597 private static final int _RUNNING = 0;
598
599 private static final int _SHUTDOWN = 1;
600
601 private static final int _STOP = 2;
602
603 private static final int _TERMINATED = 3;
604
605 private volatile boolean _allowCoreThreadTimeout;
606 private long _completedTaskCount;
607 private volatile int _corePoolSize;
608 private volatile long _keepAliveTime;
609 private volatile int _largestPoolSize;
610 private final ReentrantLock _mainLock = new ReentrantLock();
611 private volatile int _maxPoolSize;
612 private volatile int _poolSize;
613 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
614 private volatile int _runState;
615 private final TaskQueue<Runnable> _taskQueue;
616 private final Condition _terminationCondition = _mainLock.newCondition();
617 private volatile ThreadFactory _threadFactory;
618 private volatile ThreadPoolHandler _threadPoolHandler;
619 private final Set<WorkerTask> _workerTasks;
620
621 private class WorkerTask
622 extends AbstractQueuedSynchronizer implements Runnable {
623
624 public WorkerTask(Runnable runnable) {
625 _runnable = runnable;
626 }
627
628 @Override
629 public void run() {
630 boolean[] cleanUpMarker = new boolean[1];
631
632 try {
633 Runnable runnable = _runnable;
634
635 _runnable = null;
636
637 do {
638 if (runnable != null) {
639 _runTask(runnable);
640
641 runnable = null;
642 }
643 }
644 while ((runnable = _getTask(this, cleanUpMarker)) != null);
645 }
646 finally {
647 if (!cleanUpMarker[0]) {
648 _mainLock.lock();
649
650 try {
651 _completedTaskCount += _localCompletedTaskCount;
652
653 _workerTasks.remove(this);
654
655 if (--_poolSize == 0) {
656 _tryTerminate();
657 }
658 }
659 finally {
660 _mainLock.unlock();
661 }
662 }
663
664 _threadPoolHandler.beforeThreadEnd(_thread);
665 }
666 }
667
668 @Override
669 protected boolean isHeldExclusively() {
670 if (getState() == 1) {
671 return true;
672 }
673 else {
674 return false;
675 }
676 }
677
678 @Override
679 protected boolean tryAcquire(int unused) {
680 return compareAndSetState(0, 1);
681 }
682
683 @Override
684 protected boolean tryRelease(int unused) {
685 setState(0);
686
687 return true;
688 }
689
690 private boolean _interruptIfWaiting() {
691 if (!_thread.isInterrupted() && tryAcquire(1)) {
692 try {
693 _thread.interrupt();
694
695 return true;
696 }
697 finally {
698 _unlock();
699 }
700 }
701
702 return false;
703 }
704
705 private boolean _isLocked() {
706 return isHeldExclusively();
707 }
708
709 private void _lock() {
710 acquire(1);
711 }
712
713 private void _runTask(Runnable task) {
714 _lock();
715
716 try {
717 if ((_runState < _STOP) && Thread.interrupted() &&
718 (_runState >= _STOP)) {
719
720 _thread.interrupt();
721 }
722
723 Throwable throwable = null;
724
725 _threadPoolHandler.beforeExecute(_thread, task);
726
727 try {
728 task.run();
729
730 _localCompletedTaskCount++;
731 }
732 catch (RuntimeException re) {
733 throwable = re;
734
735 throw re;
736 }
737 finally {
738 _threadPoolHandler.afterExecute(task, throwable);
739 }
740 }
741 finally {
742 _unlock();
743 }
744 }
745
746 private void _startWork() {
747 _thread = _threadFactory.newThread(this);
748
749 _threadPoolHandler.beforeThreadStart(_thread);
750
751 _thread.start();
752 }
753
754 private void _unlock() {
755 release(1);
756 }
757
758 private volatile long _localCompletedTaskCount;
759 private Runnable _runnable;
760 private Thread _thread;
761
762 }
763
764 }