001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.ArrayList;
018    import java.util.HashSet;
019    import java.util.List;
020    import java.util.Set;
021    import java.util.concurrent.AbstractExecutorService;
022    import java.util.concurrent.Callable;
023    import java.util.concurrent.Executors;
024    import java.util.concurrent.ThreadFactory;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027    import java.util.concurrent.locks.Condition;
028    import java.util.concurrent.locks.ReentrantLock;
029    
030    /**
031     * <p>
032     * See https://issues.liferay.com/browse/LPS-14986.
033     * </p>
034     *
035     * @author Shuyang Zhou
036     */
037    public class ThreadPoolExecutor extends AbstractExecutorService {
038    
039            public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
040                    this(
041                            corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
042                            Integer.MAX_VALUE, new AbortPolicy(),
043                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
044            }
045    
046            public ThreadPoolExecutor(
047                    int corePoolSize, int maxPoolSize, long keepAliveTime,
048                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
049    
050                    this(
051                            corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
052                            allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
053                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
054            }
055    
056            public ThreadPoolExecutor(
057                    int corePoolSize, int maxPoolSize, long keepAliveTime,
058                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
059                    RejectedExecutionHandler rejectedExecutionHandler,
060                    ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
061    
062                    if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
063                            (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
064                            (maxQueueSize <= 0)) {
065    
066                            throw new IllegalArgumentException();
067                    }
068    
069                    if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
070                            (threadPoolHandler == null)) {
071    
072                            throw new NullPointerException();
073                    }
074    
075                    _corePoolSize = corePoolSize;
076                    _maxPoolSize = maxPoolSize;
077                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
078                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
079                    _rejectedExecutionHandler = rejectedExecutionHandler;
080                    _threadFactory = threadFactory;
081                    _threadPoolHandler = threadPoolHandler;
082                    _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
083                    _workerTasks = new HashSet<WorkerTask>();
084            }
085    
086            public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
087                    if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
088                            (newMaxPoolSize < newCorePoolSize)) {
089    
090                            throw new IllegalArgumentException();
091                    }
092    
093                    _mainLock.lock();
094    
095                    try {
096                            int surplusCoreThreads = _corePoolSize - newCorePoolSize;
097                            int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
098    
099                            _corePoolSize = newCorePoolSize;
100                            _maxPoolSize = newMaxPoolSize;
101    
102                            if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
103                                    ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
104    
105                                    int interruptCount = Math.max(
106                                            surplusCoreThreads, surplusMaxPoolSize);
107    
108                                    for (WorkerTask workerTask : _workerTasks) {
109                                            if (interruptCount > 0) {
110                                                    if (workerTask._interruptIfWaiting()) {
111                                                            interruptCount--;
112                                                    }
113                                            }
114                                            else {
115                                                    break;
116                                            }
117                                    }
118                            }
119                            else {
120                                    Runnable runnable = null;
121    
122                                    while ((surplusCoreThreads++ < 0) &&
123                                               (_poolSize < _corePoolSize) &&
124                                               ((runnable = _taskQueue.poll()) != null)) {
125    
126                                            _doAddWorkerThread(runnable);
127                                    }
128                            }
129                    }
130                    finally {
131                            _mainLock.unlock();
132                    }
133            }
134    
135            @Override
136            public boolean awaitTermination(long timeout, TimeUnit timeUnit)
137                    throws InterruptedException {
138    
139                    long nanos = timeUnit.toNanos(timeout);
140    
141                    _mainLock.lock();
142    
143                    try {
144                            while (true) {
145                                    if (_runState == _TERMINATED) {
146                                            return true;
147                                    }
148    
149                                    if (nanos <= 0) {
150                                            return false;
151                                    }
152    
153                                    nanos = _terminationCondition.awaitNanos(nanos);
154                            }
155                    }
156                    finally {
157                            _mainLock.unlock();
158                    }
159            }
160    
161            @Override
162            public void execute(Runnable runnable) {
163                    if (runnable == null) {
164                            throw new NullPointerException();
165                    }
166    
167                    boolean[] hasWaiterMarker = new boolean[1];
168    
169                    if ((_runState == _RUNNING) &&
170                            _taskQueue.offer(runnable, hasWaiterMarker)) {
171    
172                            if (_runState != _RUNNING) {
173                                    if (_taskQueue.remove(runnable)) {
174                                            _rejectedExecutionHandler.rejectedExecution(runnable, this);
175                                    }
176    
177                                    return;
178                            }
179    
180                            if (!hasWaiterMarker[0]) {
181                                    _addWorkerThread();
182                            }
183    
184                            return;
185                    }
186    
187                    _rejectedExecutionHandler.rejectedExecution(runnable, this);
188            }
189    
190            public int getActiveCount() {
191                    _mainLock.lock();
192    
193                    try {
194                            int count = 0;
195    
196                            for (WorkerTask workerTask : _workerTasks) {
197                                    if (workerTask._isLocked()) {
198                                            count++;
199                                    }
200                            }
201    
202                            return count;
203                    }
204                    finally {
205                            _mainLock.unlock();
206                    }
207            }
208    
209            public long getCompletedTaskCount() {
210                    _mainLock.lock();
211    
212                    try {
213                            long count = _completedTaskCount;
214    
215                            for (WorkerTask workerTask : _workerTasks) {
216                                    count += workerTask._localCompletedTaskCount;
217                            }
218    
219                            return count;
220                    }
221                    finally {
222                            _mainLock.unlock();
223                    }
224            }
225    
226            public int getCorePoolSize() {
227                    return _corePoolSize;
228            }
229    
230            public long getKeepAliveTime(TimeUnit timeUnit) {
231                    return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
232            }
233    
234            public int getLargestPoolSize() {
235                    return _largestPoolSize;
236            }
237    
238            public int getMaxPoolSize() {
239                    return _maxPoolSize;
240            }
241    
242            public int getPendingTaskCount() {
243                    return _taskQueue.size();
244            }
245    
246            public int getPoolSize() {
247                    return _poolSize;
248            }
249    
250            public RejectedExecutionHandler getRejectedExecutionHandler() {
251                    return _rejectedExecutionHandler;
252            }
253    
254            public int getRemainingTaskQueueCapacity() {
255                    return _taskQueue.remainingCapacity();
256            }
257    
258            public long getTaskCount() {
259                    _mainLock.lock();
260    
261                    try {
262                            long count = _completedTaskCount;
263    
264                            for (WorkerTask workerTask : _workerTasks) {
265                                    count += workerTask._localCompletedTaskCount;
266    
267                                    if (workerTask._isLocked()) {
268                                            count++;
269                                    }
270                            }
271    
272                            return count + _taskQueue.size();
273                    }
274                    finally {
275                            _mainLock.unlock();
276                    }
277            }
278    
279            public ThreadFactory getThreadFactory() {
280                    return _threadFactory;
281            }
282    
283            public ThreadPoolHandler getThreadPoolHandler() {
284                    return _threadPoolHandler;
285            }
286    
287            public boolean isAllowCoreThreadTimeout() {
288                    return _allowCoreThreadTimeout;
289            }
290    
291            @Override
292            public boolean isShutdown() {
293                    if (_runState != _RUNNING) {
294                            return true;
295                    }
296                    else {
297                            return false;
298                    }
299            }
300    
301            @Override
302            public boolean isTerminated() {
303                    if (_runState == _TERMINATED) {
304                            return true;
305                    }
306                    else {
307                            return false;
308                    }
309            }
310    
311            public boolean isTerminating() {
312                    if (_runState == _STOP) {
313                            return true;
314                    }
315                    else {
316                            return false;
317                    }
318            }
319    
320            public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
321                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
322            }
323    
324            public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
325                    if (keepAliveTime < 0) {
326                            throw new IllegalArgumentException();
327                    }
328    
329                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
330            }
331    
332            public void setRejectedExecutionHandler(
333                    RejectedExecutionHandler rejectedExecutionHandler) {
334    
335                    if (rejectedExecutionHandler == null) {
336                            throw new NullPointerException();
337                    }
338    
339                    _rejectedExecutionHandler = rejectedExecutionHandler;
340            }
341    
342            public void setThreadFactory(ThreadFactory threadFactory) {
343                    if (threadFactory == null) {
344                            throw new NullPointerException();
345                    }
346    
347                    _threadFactory = threadFactory;
348            }
349    
350            public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
351                    if (threadPoolHandler == null) {
352                            throw new NullPointerException();
353                    }
354    
355                    _threadPoolHandler = threadPoolHandler;
356            }
357    
358            @Override
359            public void shutdown() {
360                    _mainLock.lock();
361    
362                    try {
363                            int state = _runState;
364    
365                            if (state < _SHUTDOWN) {
366                                    _runState = _SHUTDOWN;
367                            }
368    
369                            for (WorkerTask workerTask : _workerTasks) {
370                                    workerTask._interruptIfWaiting();
371                            }
372    
373                            _tryTerminate();
374                    }
375                    finally {
376                            _mainLock.unlock();
377                    }
378            }
379    
380            @Override
381            public List<Runnable> shutdownNow() {
382                    _mainLock.lock();
383    
384                    try {
385                            int state = _runState;
386    
387                            if (state < _STOP) {
388                                    _runState = _STOP;
389                            }
390    
391                            for (WorkerTask workerTask : _workerTasks) {
392                                    workerTask._thread.interrupt();
393                            }
394    
395                            List<Runnable> runnables = new ArrayList<Runnable>();
396    
397                            _taskQueue.drainTo(runnables);
398    
399                            _tryTerminate();
400    
401                            return runnables;
402                    }
403                    finally {
404                            _mainLock.unlock();
405                    }
406            }
407    
408            @Override
409            public <T> NoticeableFuture<T> submit(Callable<T> callable) {
410                    if (callable == null) {
411                            throw new NullPointerException("Callable is null");
412                    }
413    
414                    DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
415                            callable);
416    
417                    execute(defaultNoticeableFuture);
418    
419                    return defaultNoticeableFuture;
420            }
421    
422            @Override
423            public NoticeableFuture<?> submit(Runnable runnable) {
424                    return submit(runnable, null);
425            }
426    
427            @Override
428            public <T> NoticeableFuture<T> submit(Runnable runnable, T result) {
429                    if (runnable == null) {
430                            throw new NullPointerException("Runnable is null");
431                    }
432    
433                    DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
434                            runnable, result);
435    
436                    execute(defaultNoticeableFuture);
437    
438                    return defaultNoticeableFuture;
439            }
440    
441            @Override
442            protected void finalize() {
443                    shutdown();
444            }
445    
446            protected ReentrantLock getMainLock() {
447                    return _mainLock;
448            }
449    
450            protected TaskQueue<Runnable> getTaskQueue() {
451                    return _taskQueue;
452            }
453    
454            protected Set<WorkerTask> getWorkerTasks() {
455                    return _workerTasks;
456            }
457    
458            @Override
459            protected <T> DefaultNoticeableFuture<T> newTaskFor(Callable<T> callable) {
460                    return new DefaultNoticeableFuture<T>(callable);
461            }
462    
463            @Override
464            protected <T> DefaultNoticeableFuture<T> newTaskFor(
465                    Runnable runnable, T value) {
466    
467                    return new DefaultNoticeableFuture<T>(runnable, value);
468            }
469    
470            private void _addWorkerThread() {
471                    int runState = _runState;
472                    int poolSize = _poolSize;
473    
474                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
475                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
476                             !_taskQueue.isEmpty())) {
477    
478                            _mainLock.lock();
479    
480                            try {
481                                    runState = _runState;
482                                    poolSize = _poolSize;
483    
484                                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
485                                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
486                                             !_taskQueue.isEmpty())) {
487    
488                                            Runnable runnable = _taskQueue.poll();
489    
490                                            if (runnable != null) {
491                                                    _doAddWorkerThread(runnable);
492                                            }
493                                    }
494                            }
495                            finally {
496                                    _mainLock.unlock();
497                            }
498                    }
499            }
500    
501            private void _doAddWorkerThread(Runnable runnable) {
502                    WorkerTask workerTask = new WorkerTask(runnable);
503    
504                    _workerTasks.add(workerTask);
505    
506                    int poolSize = ++_poolSize;
507    
508                    if (poolSize > _largestPoolSize) {
509                            _largestPoolSize = poolSize;
510                    }
511    
512                    workerTask._startWork();
513            }
514    
515            private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
516                    while (true) {
517                            try {
518                                    int state = _runState;
519    
520                                    if (state >= _STOP) {
521                                            return null;
522                                    }
523    
524                                    Runnable runnable = null;
525    
526                                    if (state == _SHUTDOWN) {
527                                            runnable = _taskQueue.poll();
528                                    }
529                                    else if ((_poolSize > _corePoolSize) ||
530                                                     _allowCoreThreadTimeout) {
531    
532                                            runnable = _taskQueue.poll(
533                                                    _keepAliveTime, TimeUnit.NANOSECONDS);
534                                    }
535                                    else {
536                                            runnable = _taskQueue.take();
537                                    }
538    
539                                    if (runnable != null) {
540                                            return runnable;
541                                    }
542    
543                                    _mainLock.lock();
544    
545                                    try {
546                                            if ((_runState >= _STOP) ||
547                                                    ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
548                                                    (_allowCoreThreadTimeout &&
549                                                     ((_poolSize > 1) || _taskQueue.isEmpty())) ||
550                                                    (!_allowCoreThreadTimeout &&
551                                                     (_poolSize > _corePoolSize))) {
552    
553                                                    _completedTaskCount +=
554                                                            workerTask._localCompletedTaskCount;
555    
556                                                    _workerTasks.remove(workerTask);
557    
558                                                    if (--_poolSize == 0) {
559                                                            _tryTerminate();
560                                                    }
561    
562                                                    cleanUpMarker[0] = true;
563    
564                                                    return null;
565                                            }
566                                    }
567                                    finally {
568                                            _mainLock.unlock();
569                                    }
570                            }
571                            catch (InterruptedException ie) {
572                            }
573                    }
574            }
575    
576            private void _tryTerminate() {
577                    if (_poolSize == 0) {
578                            int state = _runState;
579    
580                            if ((state == _STOP) ||
581                                    ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
582    
583                                    _runState = _TERMINATED;
584    
585                                    _terminationCondition.signalAll();
586                                    _threadPoolHandler.terminated();
587    
588                                    return;
589                            }
590    
591                            if (!_taskQueue.isEmpty()) {
592                                    _doAddWorkerThread(_taskQueue.poll());
593                            }
594                    }
595            }
596    
597            private static final int _RUNNING = 0;
598    
599            private static final int _SHUTDOWN = 1;
600    
601            private static final int _STOP = 2;
602    
603            private static final int _TERMINATED = 3;
604    
605            private volatile boolean _allowCoreThreadTimeout;
606            private long _completedTaskCount;
607            private volatile int _corePoolSize;
608            private volatile long _keepAliveTime;
609            private volatile int _largestPoolSize;
610            private final ReentrantLock _mainLock = new ReentrantLock();
611            private volatile int _maxPoolSize;
612            private volatile int _poolSize;
613            private volatile RejectedExecutionHandler _rejectedExecutionHandler;
614            private volatile int _runState;
615            private final TaskQueue<Runnable> _taskQueue;
616            private final Condition _terminationCondition = _mainLock.newCondition();
617            private volatile ThreadFactory _threadFactory;
618            private volatile ThreadPoolHandler _threadPoolHandler;
619            private final Set<WorkerTask> _workerTasks;
620    
621            private class WorkerTask
622                    extends AbstractQueuedSynchronizer implements Runnable {
623    
624                    public WorkerTask(Runnable runnable) {
625                            _runnable = runnable;
626                    }
627    
628                    @Override
629                    public void run() {
630                            boolean[] cleanUpMarker = new boolean[1];
631    
632                            try {
633                                    Runnable runnable = _runnable;
634    
635                                    _runnable = null;
636    
637                                    do {
638                                            if (runnable != null) {
639                                                    _runTask(runnable);
640    
641                                                    runnable = null;
642                                            }
643                                    }
644                                    while ((runnable = _getTask(this, cleanUpMarker)) != null);
645                            }
646                            finally {
647                                    if (!cleanUpMarker[0]) {
648                                            _mainLock.lock();
649    
650                                            try {
651                                                    _completedTaskCount += _localCompletedTaskCount;
652    
653                                                    _workerTasks.remove(this);
654    
655                                                    if (--_poolSize == 0) {
656                                                            _tryTerminate();
657                                                    }
658                                            }
659                                            finally {
660                                                    _mainLock.unlock();
661                                            }
662                                    }
663    
664                                    _threadPoolHandler.beforeThreadEnd(_thread);
665                            }
666                    }
667    
668                    @Override
669                    protected boolean isHeldExclusively() {
670                            if (getState() == 1) {
671                                    return true;
672                            }
673                            else {
674                                    return false;
675                            }
676                    }
677    
678                    @Override
679                    protected boolean tryAcquire(int unused) {
680                            return compareAndSetState(0, 1);
681                    }
682    
683                    @Override
684                    protected boolean tryRelease(int unused) {
685                            setState(0);
686    
687                            return true;
688                    }
689    
690                    private boolean _interruptIfWaiting() {
691                            if (!_thread.isInterrupted() && tryAcquire(1)) {
692                                    try {
693                                            _thread.interrupt();
694    
695                                            return true;
696                                    }
697                                    finally {
698                                            _unlock();
699                                    }
700                            }
701    
702                            return false;
703                    }
704    
705                    private boolean _isLocked() {
706                            return isHeldExclusively();
707                    }
708    
709                    private void _lock() {
710                            acquire(1);
711                    }
712    
713                    private void _runTask(Runnable task) {
714                            _lock();
715    
716                            try {
717                                    if ((_runState < _STOP) && Thread.interrupted() &&
718                                            (_runState >= _STOP)) {
719    
720                                            _thread.interrupt();
721                                    }
722    
723                                    Throwable throwable = null;
724    
725                                    _threadPoolHandler.beforeExecute(_thread, task);
726    
727                                    try {
728                                            task.run();
729    
730                                            _localCompletedTaskCount++;
731                                    }
732                                    catch (RuntimeException re) {
733                                            throwable = re;
734    
735                                            throw re;
736                                    }
737                                    finally {
738                                            _threadPoolHandler.afterExecute(task, throwable);
739                                    }
740                            }
741                            finally {
742                                    _unlock();
743                            }
744                    }
745    
746                    private void _startWork() {
747                            _thread = _threadFactory.newThread(this);
748    
749                            _threadPoolHandler.beforeThreadStart(_thread);
750    
751                            _thread.start();
752                    }
753    
754                    private void _unlock() {
755                            release(1);
756                    }
757    
758                    private volatile long _localCompletedTaskCount;
759                    private Runnable _runnable;
760                    private Thread _thread;
761    
762            }
763    
764    }