001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.ArrayList;
018    import java.util.HashSet;
019    import java.util.List;
020    import java.util.Set;
021    import java.util.concurrent.AbstractExecutorService;
022    import java.util.concurrent.Callable;
023    import java.util.concurrent.Executors;
024    import java.util.concurrent.ThreadFactory;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027    import java.util.concurrent.locks.Condition;
028    import java.util.concurrent.locks.ReentrantLock;
029    
030    /**
031     * <p>
032     * See https://issues.liferay.com/browse/LPS-14986.
033     * </p>
034     *
035     * @author Shuyang Zhou
036     */
037    public class ThreadPoolExecutor extends AbstractExecutorService {
038    
039            public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
040                    this(
041                            corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
042                            Integer.MAX_VALUE, new AbortPolicy(),
043                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
044            }
045    
046            public ThreadPoolExecutor(
047                    int corePoolSize, int maxPoolSize, long keepAliveTime,
048                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
049    
050                    this(
051                            corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
052                            allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
053                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
054            }
055    
056            public ThreadPoolExecutor(
057                    int corePoolSize, int maxPoolSize, long keepAliveTime,
058                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
059                    RejectedExecutionHandler rejectedExecutionHandler,
060                    ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
061    
062                    if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
063                            (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
064                            (maxQueueSize <= 0)) {
065    
066                            throw new IllegalArgumentException();
067                    }
068    
069                    if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
070                            (threadPoolHandler == null)) {
071    
072                            throw new NullPointerException();
073                    }
074    
075                    _corePoolSize = corePoolSize;
076                    _maxPoolSize = maxPoolSize;
077                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
078                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
079                    _rejectedExecutionHandler = rejectedExecutionHandler;
080                    _threadFactory = threadFactory;
081                    _threadPoolHandler = threadPoolHandler;
082                    _taskQueue = new TaskQueue<>(maxQueueSize);
083                    _workerTasks = new HashSet<>();
084            }
085    
086            public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
087                    if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
088                            (newMaxPoolSize < newCorePoolSize)) {
089    
090                            throw new IllegalArgumentException();
091                    }
092    
093                    _mainLock.lock();
094    
095                    try {
096                            int surplusCoreThreads = _corePoolSize - newCorePoolSize;
097                            int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
098    
099                            _corePoolSize = newCorePoolSize;
100                            _maxPoolSize = newMaxPoolSize;
101    
102                            if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
103                                    ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
104    
105                                    int interruptCount = Math.max(
106                                            surplusCoreThreads, surplusMaxPoolSize);
107    
108                                    for (WorkerTask workerTask : _workerTasks) {
109                                            if (interruptCount > 0) {
110                                                    if (workerTask._interruptIfWaiting()) {
111                                                            interruptCount--;
112                                                    }
113                                            }
114                                            else {
115                                                    break;
116                                            }
117                                    }
118                            }
119                            else {
120                                    Runnable runnable = null;
121    
122                                    while ((surplusCoreThreads++ < 0) &&
123                                               (_poolSize < _corePoolSize) &&
124                                               ((runnable = _taskQueue.poll()) != null)) {
125    
126                                            _doAddWorkerThread(runnable);
127                                    }
128                            }
129                    }
130                    finally {
131                            _mainLock.unlock();
132                    }
133            }
134    
135            @Override
136            public boolean awaitTermination(long timeout, TimeUnit timeUnit)
137                    throws InterruptedException {
138    
139                    long nanos = timeUnit.toNanos(timeout);
140    
141                    _mainLock.lock();
142    
143                    try {
144                            while (true) {
145                                    if (_runState == _TERMINATED) {
146                                            return true;
147                                    }
148    
149                                    if (nanos <= 0) {
150                                            return false;
151                                    }
152    
153                                    nanos = _terminationCondition.awaitNanos(nanos);
154                            }
155                    }
156                    finally {
157                            _mainLock.unlock();
158                    }
159            }
160    
161            @Override
162            public void execute(Runnable runnable) {
163                    if (runnable == null) {
164                            throw new NullPointerException();
165                    }
166    
167                    boolean[] hasWaiterMarker = new boolean[1];
168    
169                    if ((_runState == _RUNNING) &&
170                            _taskQueue.offer(runnable, hasWaiterMarker)) {
171    
172                            if (_runState != _RUNNING) {
173                                    if (_taskQueue.remove(runnable)) {
174                                            _rejectedExecutionHandler.rejectedExecution(runnable, this);
175                                    }
176    
177                                    return;
178                            }
179    
180                            if (!hasWaiterMarker[0]) {
181                                    _addWorkerThread();
182                            }
183    
184                            return;
185                    }
186    
187                    _rejectedExecutionHandler.rejectedExecution(runnable, this);
188            }
189    
190            public int getActiveCount() {
191                    _mainLock.lock();
192    
193                    try {
194                            int count = 0;
195    
196                            for (WorkerTask workerTask : _workerTasks) {
197                                    if (workerTask._isLocked()) {
198                                            count++;
199                                    }
200                            }
201    
202                            return count;
203                    }
204                    finally {
205                            _mainLock.unlock();
206                    }
207            }
208    
209            public long getCompletedTaskCount() {
210                    _mainLock.lock();
211    
212                    try {
213                            long count = _completedTaskCount;
214    
215                            for (WorkerTask workerTask : _workerTasks) {
216                                    count += workerTask._localCompletedTaskCount;
217                            }
218    
219                            return count;
220                    }
221                    finally {
222                            _mainLock.unlock();
223                    }
224            }
225    
226            public int getCorePoolSize() {
227                    return _corePoolSize;
228            }
229    
230            public long getKeepAliveTime(TimeUnit timeUnit) {
231                    return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
232            }
233    
234            public int getLargestPoolSize() {
235                    return _largestPoolSize;
236            }
237    
238            public int getMaxPoolSize() {
239                    return _maxPoolSize;
240            }
241    
242            public String getName() {
243                    return _name;
244            }
245    
246            public int getPendingTaskCount() {
247                    return _taskQueue.size();
248            }
249    
250            public int getPoolSize() {
251                    return _poolSize;
252            }
253    
254            public RejectedExecutionHandler getRejectedExecutionHandler() {
255                    return _rejectedExecutionHandler;
256            }
257    
258            public int getRemainingTaskQueueCapacity() {
259                    return _taskQueue.remainingCapacity();
260            }
261    
262            public long getTaskCount() {
263                    _mainLock.lock();
264    
265                    try {
266                            long count = _completedTaskCount;
267    
268                            for (WorkerTask workerTask : _workerTasks) {
269                                    count += workerTask._localCompletedTaskCount;
270    
271                                    if (workerTask._isLocked()) {
272                                            count++;
273                                    }
274                            }
275    
276                            return count + _taskQueue.size();
277                    }
278                    finally {
279                            _mainLock.unlock();
280                    }
281            }
282    
283            public ThreadFactory getThreadFactory() {
284                    return _threadFactory;
285            }
286    
287            public ThreadPoolHandler getThreadPoolHandler() {
288                    return _threadPoolHandler;
289            }
290    
291            public boolean isAllowCoreThreadTimeout() {
292                    return _allowCoreThreadTimeout;
293            }
294    
295            @Override
296            public boolean isShutdown() {
297                    if (_runState != _RUNNING) {
298                            return true;
299                    }
300                    else {
301                            return false;
302                    }
303            }
304    
305            @Override
306            public boolean isTerminated() {
307                    if (_runState == _TERMINATED) {
308                            return true;
309                    }
310                    else {
311                            return false;
312                    }
313            }
314    
315            public boolean isTerminating() {
316                    if (_runState == _STOP) {
317                            return true;
318                    }
319                    else {
320                            return false;
321                    }
322            }
323    
324            public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
325                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
326            }
327    
328            public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
329                    if (keepAliveTime < 0) {
330                            throw new IllegalArgumentException();
331                    }
332    
333                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
334            }
335    
336            public void setName(String name) {
337                    _name = name;
338            }
339    
340            public void setRejectedExecutionHandler(
341                    RejectedExecutionHandler rejectedExecutionHandler) {
342    
343                    if (rejectedExecutionHandler == null) {
344                            throw new NullPointerException();
345                    }
346    
347                    _rejectedExecutionHandler = rejectedExecutionHandler;
348            }
349    
350            public void setThreadFactory(ThreadFactory threadFactory) {
351                    if (threadFactory == null) {
352                            throw new NullPointerException();
353                    }
354    
355                    _threadFactory = threadFactory;
356            }
357    
358            public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
359                    if (threadPoolHandler == null) {
360                            throw new NullPointerException();
361                    }
362    
363                    _threadPoolHandler = threadPoolHandler;
364            }
365    
366            @Override
367            public void shutdown() {
368                    _mainLock.lock();
369    
370                    try {
371                            int state = _runState;
372    
373                            if (state < _SHUTDOWN) {
374                                    _runState = _SHUTDOWN;
375                            }
376    
377                            for (WorkerTask workerTask : _workerTasks) {
378                                    workerTask._interruptIfWaiting();
379                            }
380    
381                            _tryTerminate();
382                    }
383                    finally {
384                            _mainLock.unlock();
385                    }
386            }
387    
388            @Override
389            public List<Runnable> shutdownNow() {
390                    _mainLock.lock();
391    
392                    try {
393                            int state = _runState;
394    
395                            if (state < _STOP) {
396                                    _runState = _STOP;
397                            }
398    
399                            for (WorkerTask workerTask : _workerTasks) {
400                                    workerTask._thread.interrupt();
401                            }
402    
403                            List<Runnable> runnables = new ArrayList<>();
404    
405                            _taskQueue.drainTo(runnables);
406    
407                            _tryTerminate();
408    
409                            return runnables;
410                    }
411                    finally {
412                            _mainLock.unlock();
413                    }
414            }
415    
416            @Override
417            public <T> NoticeableFuture<T> submit(Callable<T> callable) {
418                    if (callable == null) {
419                            throw new NullPointerException("Callable is null");
420                    }
421    
422                    DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
423                            callable);
424    
425                    execute(defaultNoticeableFuture);
426    
427                    return defaultNoticeableFuture;
428            }
429    
430            @Override
431            public NoticeableFuture<?> submit(Runnable runnable) {
432                    return submit(runnable, null);
433            }
434    
435            @Override
436            public <T> NoticeableFuture<T> submit(Runnable runnable, T result) {
437                    if (runnable == null) {
438                            throw new NullPointerException("Runnable is null");
439                    }
440    
441                    DefaultNoticeableFuture<T> defaultNoticeableFuture = newTaskFor(
442                            runnable, result);
443    
444                    execute(defaultNoticeableFuture);
445    
446                    return defaultNoticeableFuture;
447            }
448    
449            public NoticeableFuture<Void> terminationNoticeableFuture() {
450                    return _terminationDefaultNoticeableFuture;
451            }
452    
453            @Override
454            protected void finalize() {
455                    shutdown();
456            }
457    
458            protected ReentrantLock getMainLock() {
459                    return _mainLock;
460            }
461    
462            protected TaskQueue<Runnable> getTaskQueue() {
463                    return _taskQueue;
464            }
465    
466            protected Set<WorkerTask> getWorkerTasks() {
467                    return _workerTasks;
468            }
469    
470            @Override
471            protected <T> DefaultNoticeableFuture<T> newTaskFor(Callable<T> callable) {
472                    return new DefaultNoticeableFuture<>(callable);
473            }
474    
475            @Override
476            protected <T> DefaultNoticeableFuture<T> newTaskFor(
477                    Runnable runnable, T value) {
478    
479                    return new DefaultNoticeableFuture<>(runnable, value);
480            }
481    
482            private void _addWorkerThread() {
483                    int runState = _runState;
484                    int poolSize = _poolSize;
485    
486                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
487                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
488                             !_taskQueue.isEmpty())) {
489    
490                            _mainLock.lock();
491    
492                            try {
493                                    runState = _runState;
494                                    poolSize = _poolSize;
495    
496                                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
497                                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
498                                             !_taskQueue.isEmpty())) {
499    
500                                            Runnable runnable = _taskQueue.poll();
501    
502                                            if (runnable != null) {
503                                                    _doAddWorkerThread(runnable);
504                                            }
505                                    }
506                            }
507                            finally {
508                                    _mainLock.unlock();
509                            }
510                    }
511            }
512    
513            private void _doAddWorkerThread(Runnable runnable) {
514                    WorkerTask workerTask = new WorkerTask(runnable);
515    
516                    _workerTasks.add(workerTask);
517    
518                    int poolSize = ++_poolSize;
519    
520                    if (poolSize > _largestPoolSize) {
521                            _largestPoolSize = poolSize;
522                    }
523    
524                    workerTask._startWork();
525            }
526    
527            private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
528                    while (true) {
529                            try {
530                                    int state = _runState;
531    
532                                    if (state >= _STOP) {
533                                            return null;
534                                    }
535    
536                                    Runnable runnable = null;
537    
538                                    if (state == _SHUTDOWN) {
539                                            runnable = _taskQueue.poll();
540                                    }
541                                    else if ((_poolSize > _corePoolSize) ||
542                                                     _allowCoreThreadTimeout) {
543    
544                                            runnable = _taskQueue.poll(
545                                                    _keepAliveTime, TimeUnit.NANOSECONDS);
546                                    }
547                                    else {
548                                            runnable = _taskQueue.take();
549                                    }
550    
551                                    if (runnable != null) {
552                                            return runnable;
553                                    }
554    
555                                    _mainLock.lock();
556    
557                                    try {
558                                            if ((_runState >= _STOP) ||
559                                                    ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
560                                                    (_allowCoreThreadTimeout &&
561                                                     ((_poolSize > 1) || _taskQueue.isEmpty())) ||
562                                                    (!_allowCoreThreadTimeout &&
563                                                     (_poolSize > _corePoolSize))) {
564    
565                                                    _completedTaskCount +=
566                                                            workerTask._localCompletedTaskCount;
567    
568                                                    _workerTasks.remove(workerTask);
569    
570                                                    if (--_poolSize == 0) {
571                                                            _tryTerminate();
572                                                    }
573    
574                                                    cleanUpMarker[0] = true;
575    
576                                                    return null;
577                                            }
578                                    }
579                                    finally {
580                                            _mainLock.unlock();
581                                    }
582                            }
583                            catch (InterruptedException ie) {
584                            }
585                    }
586            }
587    
588            private void _tryTerminate() {
589                    if (_poolSize == 0) {
590                            int state = _runState;
591    
592                            if ((state == _STOP) ||
593                                    ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
594    
595                                    _runState = _TERMINATED;
596    
597                                    _terminationCondition.signalAll();
598    
599                                    _threadPoolHandler.terminated();
600    
601                                    _terminationDefaultNoticeableFuture.run();
602    
603                                    return;
604                            }
605    
606                            if (!_taskQueue.isEmpty()) {
607                                    _doAddWorkerThread(_taskQueue.poll());
608                            }
609                    }
610            }
611    
612            private static final int _RUNNING = 0;
613    
614            private static final int _SHUTDOWN = 1;
615    
616            private static final int _STOP = 2;
617    
618            private static final int _TERMINATED = 3;
619    
620            private volatile boolean _allowCoreThreadTimeout;
621            private long _completedTaskCount;
622            private volatile int _corePoolSize;
623            private volatile long _keepAliveTime;
624            private volatile int _largestPoolSize;
625            private final ReentrantLock _mainLock = new ReentrantLock();
626            private volatile int _maxPoolSize;
627            private String _name;
628            private volatile int _poolSize;
629            private volatile RejectedExecutionHandler _rejectedExecutionHandler;
630            private volatile int _runState;
631            private final TaskQueue<Runnable> _taskQueue;
632            private final Condition _terminationCondition = _mainLock.newCondition();
633    
634            private final DefaultNoticeableFuture<Void>
635                    _terminationDefaultNoticeableFuture =
636                            new DefaultNoticeableFuture<Void>() {
637    
638                                    @Override
639                                    public boolean cancel(boolean mayInterruptIfRunning) {
640                                            return false;
641                                    }
642    
643                            };
644    
645            private volatile ThreadFactory _threadFactory;
646            private volatile ThreadPoolHandler _threadPoolHandler;
647            private final Set<WorkerTask> _workerTasks;
648    
649            private class WorkerTask
650                    extends AbstractQueuedSynchronizer implements Runnable {
651    
652                    public WorkerTask(Runnable runnable) {
653                            _runnable = runnable;
654                    }
655    
656                    @Override
657                    public void run() {
658                            boolean[] cleanUpMarker = new boolean[1];
659    
660                            try {
661                                    Runnable runnable = _runnable;
662    
663                                    _runnable = null;
664    
665                                    do {
666                                            if (runnable != null) {
667                                                    _runTask(runnable);
668    
669                                                    runnable = null;
670                                            }
671                                    }
672                                    while ((runnable = _getTask(this, cleanUpMarker)) != null);
673                            }
674                            finally {
675                                    if (!cleanUpMarker[0]) {
676                                            _mainLock.lock();
677    
678                                            try {
679                                                    _completedTaskCount += _localCompletedTaskCount;
680    
681                                                    _workerTasks.remove(this);
682    
683                                                    if (--_poolSize == 0) {
684                                                            _tryTerminate();
685                                                    }
686                                            }
687                                            finally {
688                                                    _mainLock.unlock();
689                                            }
690                                    }
691    
692                                    _threadPoolHandler.beforeThreadEnd(_thread);
693                            }
694                    }
695    
696                    @Override
697                    protected boolean isHeldExclusively() {
698                            if (getState() == 1) {
699                                    return true;
700                            }
701                            else {
702                                    return false;
703                            }
704                    }
705    
706                    @Override
707                    protected boolean tryAcquire(int unused) {
708                            return compareAndSetState(0, 1);
709                    }
710    
711                    @Override
712                    protected boolean tryRelease(int unused) {
713                            setState(0);
714    
715                            return true;
716                    }
717    
718                    private boolean _interruptIfWaiting() {
719                            if (!_thread.isInterrupted() && tryAcquire(1)) {
720                                    try {
721                                            _thread.interrupt();
722    
723                                            return true;
724                                    }
725                                    finally {
726                                            _unlock();
727                                    }
728                            }
729    
730                            return false;
731                    }
732    
733                    private boolean _isLocked() {
734                            return isHeldExclusively();
735                    }
736    
737                    private void _lock() {
738                            acquire(1);
739                    }
740    
741                    private void _runTask(Runnable task) {
742                            _lock();
743    
744                            try {
745                                    if ((_runState < _STOP) && Thread.interrupted() &&
746                                            (_runState >= _STOP)) {
747    
748                                            _thread.interrupt();
749                                    }
750    
751                                    Throwable throwable = null;
752    
753                                    _threadPoolHandler.beforeExecute(_thread, task);
754    
755                                    try {
756                                            task.run();
757    
758                                            _localCompletedTaskCount++;
759                                    }
760                                    catch (RuntimeException re) {
761                                            throwable = re;
762    
763                                            throw re;
764                                    }
765                                    finally {
766                                            _threadPoolHandler.afterExecute(task, throwable);
767                                    }
768                            }
769                            finally {
770                                    _unlock();
771                            }
772                    }
773    
774                    private void _startWork() {
775                            _thread = _threadFactory.newThread(this);
776    
777                            _threadPoolHandler.beforeThreadStart(_thread);
778    
779                            _thread.start();
780                    }
781    
782                    private void _unlock() {
783                            release(1);
784                    }
785    
786                    private volatile long _localCompletedTaskCount;
787                    private Runnable _runnable;
788                    private Thread _thread;
789    
790            }
791    
792    }