001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.ArrayList;
018    import java.util.HashSet;
019    import java.util.List;
020    import java.util.Set;
021    import java.util.concurrent.AbstractExecutorService;
022    import java.util.concurrent.Executors;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026    import java.util.concurrent.locks.Condition;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * <p>
031     * See http://issues.liferay.com/browse/LPS-14986.
032     * </p>
033     *
034     * @author Shuyang Zhou
035     */
036    public class ThreadPoolExecutor extends AbstractExecutorService {
037    
038            public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039                    this(
040                            corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041                            Integer.MAX_VALUE, new AbortPolicy(),
042                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043            }
044    
045            public ThreadPoolExecutor(
046                    int corePoolSize, int maxPoolSize, long keepAliveTime,
047                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048    
049                    this(
050                            corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051                            allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053            }
054    
055            public ThreadPoolExecutor(
056                    int corePoolSize, int maxPoolSize, long keepAliveTime,
057                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058                    RejectedExecutionHandler rejectedExecutionHandler,
059                    ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060    
061                    if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062                            (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063                            (maxQueueSize <= 0)) {
064    
065                            throw new IllegalArgumentException();
066                    }
067    
068                    if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069                            (threadPoolHandler == null)) {
070    
071                            throw new NullPointerException();
072                    }
073    
074                    _corePoolSize = corePoolSize;
075                    _maxPoolSize = maxPoolSize;
076                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
078                    _rejectedExecutionHandler = rejectedExecutionHandler;
079                    _threadFactory = threadFactory;
080                    _threadPoolHandler = threadPoolHandler;
081                    _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082                    _workerTasks = new HashSet<WorkerTask>();
083            }
084    
085            public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086                    if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087                            (newMaxPoolSize < newCorePoolSize)) {
088    
089                            throw new IllegalArgumentException();
090                    }
091    
092                    _mainLock.lock();
093    
094                    try {
095                            int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096                            int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097    
098                            _corePoolSize = newCorePoolSize;
099                            _maxPoolSize = newMaxPoolSize;
100    
101                            if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102                                    ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103    
104                                    int interruptCount = Math.max(
105                                            surplusCoreThreads, surplusMaxPoolSize);
106    
107                                    for (WorkerTask workerTask : _workerTasks) {
108                                            if (interruptCount > 0) {
109                                                    if (workerTask._interruptIfWaiting()) {
110                                                            interruptCount--;
111                                                    }
112                                            }
113                                            else {
114                                                    break;
115                                            }
116                                    }
117                            }
118                            else {
119                                    Runnable runnable = null;
120    
121                                    while ((surplusCoreThreads++ < 0) &&
122                                               (_poolSize < _corePoolSize) &&
123                                               ((runnable = _taskQueue.poll()) != null)) {
124    
125                                            _doAddWorkerThread(runnable);
126                                    }
127                            }
128                    }
129                    finally {
130                            _mainLock.unlock();
131                    }
132            }
133    
134            public boolean awaitTermination(long timeout, TimeUnit timeUnit)
135                    throws InterruptedException {
136    
137                    long nanos = timeUnit.toNanos(timeout);
138    
139                    _mainLock.lock();
140    
141                    try {
142                            while (true) {
143                                    if (_runState == _TERMINATED) {
144                                            return true;
145                                    }
146    
147                                    if (nanos <= 0) {
148                                            return false;
149                                    }
150    
151                                    nanos = _terminationCondition.awaitNanos(nanos);
152                            }
153                    }
154                    finally {
155                            _mainLock.unlock();
156                    }
157            }
158    
159            public void execute(Runnable runnable) {
160                    if (runnable == null) {
161                            throw new NullPointerException();
162                    }
163    
164                    boolean[] hasWaiterMarker = new boolean[1];
165    
166                    if ((_runState == _RUNNING) &&
167                            _taskQueue.offer(runnable, hasWaiterMarker)) {
168    
169                            if (_runState != _RUNNING) {
170                                    if (_taskQueue.remove(runnable)) {
171                                            _rejectedExecutionHandler.rejectedExecution(runnable, this);
172                                    }
173    
174                                    return;
175                            }
176    
177                            if (!hasWaiterMarker[0]) {
178                                    _addWorkerThread();
179                            }
180    
181                            return;
182                    }
183    
184                    _rejectedExecutionHandler.rejectedExecution(runnable, this);
185            }
186    
187            public int getActiveCount() {
188                    _mainLock.lock();
189    
190                    try {
191                            int count = 0;
192    
193                            for (WorkerTask workerTask : _workerTasks) {
194                                    if (workerTask._isLocked()) {
195                                            count++;
196                                    }
197                            }
198    
199                            return count;
200                    }
201                    finally {
202                            _mainLock.unlock();
203                    }
204            }
205    
206            public long getCompletedTaskCount() {
207                    _mainLock.lock();
208    
209                    try {
210                            long count = _completedTaskCount;
211    
212                            for (WorkerTask workerTask : _workerTasks) {
213                                    count += workerTask._localCompletedTaskCount;
214                            }
215    
216                            return count;
217                    }
218                    finally {
219                            _mainLock.unlock();
220                    }
221            }
222    
223            public int getCorePoolSize() {
224                    return _corePoolSize;
225            }
226    
227            public long getKeepAliveTime(TimeUnit timeUnit) {
228                    return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
229            }
230    
231            public int getLargestPoolSize() {
232                    return _largestPoolSize;
233            }
234    
235            public int getMaxPoolSize() {
236                    return _maxPoolSize;
237            }
238    
239            public int getPendingTaskCount() {
240                    return _taskQueue.size();
241            }
242    
243            public int getPoolSize() {
244                    return _poolSize;
245            }
246    
247            public RejectedExecutionHandler getRejectedExecutionHandler() {
248                    return _rejectedExecutionHandler;
249            }
250    
251            public int getRemainingTaskQueueCapacity() {
252                    return _taskQueue.remainingCapacity();
253            }
254    
255            public long getTaskCount() {
256                    _mainLock.lock();
257    
258                    try {
259                            long count = _completedTaskCount;
260    
261                            for (WorkerTask workerTask : _workerTasks) {
262                                    count += workerTask._localCompletedTaskCount;
263    
264                                    if (workerTask._isLocked()) {
265                                            count++;
266                                    }
267                            }
268    
269                            return count + _taskQueue.size();
270                    }
271                    finally {
272                            _mainLock.unlock();
273                    }
274            }
275    
276            public ThreadFactory getThreadFactory() {
277                    return _threadFactory;
278            }
279    
280            public ThreadPoolHandler getThreadPoolHandler() {
281                    return _threadPoolHandler;
282            }
283    
284            public boolean isAllowCoreThreadTimeout() {
285                    return _allowCoreThreadTimeout;
286            }
287    
288            public boolean isShutdown() {
289                    if (_runState != _RUNNING) {
290                            return true;
291                    }
292                    else {
293                            return false;
294                    }
295            }
296    
297            public boolean isTerminated() {
298                    if (_runState == _TERMINATED) {
299                            return true;
300                    }
301                    else {
302                            return false;
303                    }
304            }
305    
306            public boolean isTerminating() {
307                    if (_runState == _STOP) {
308                            return true;
309                    }
310                    else {
311                            return false;
312                    }
313            }
314    
315            public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
316                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
317            }
318    
319            public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
320                    if (keepAliveTime < 0) {
321                            throw new IllegalArgumentException();
322                    }
323    
324                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
325            }
326    
327            public void setRejectedExecutionHandler(
328                    RejectedExecutionHandler rejectedExecutionHandler) {
329    
330                    if (rejectedExecutionHandler == null) {
331                            throw new NullPointerException();
332                    }
333    
334                    _rejectedExecutionHandler = rejectedExecutionHandler;
335            }
336    
337            public void setThreadFactory(ThreadFactory threadFactory) {
338                    if (threadFactory == null) {
339                            throw new NullPointerException();
340                    }
341    
342                    _threadFactory = threadFactory;
343            }
344    
345            public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
346                    if (threadPoolHandler == null) {
347                            throw new NullPointerException();
348                    }
349    
350                    _threadPoolHandler = threadPoolHandler;
351            }
352    
353            public void shutdown() {
354                    _mainLock.lock();
355    
356                    try {
357                            int state = _runState;
358    
359                            if (state < _SHUTDOWN) {
360                                    _runState = _SHUTDOWN;
361                            }
362    
363                            for (WorkerTask workerTask : _workerTasks) {
364                                    workerTask._interruptIfWaiting();
365                            }
366    
367                            _tryTerminate();
368                    }
369                    finally {
370                            _mainLock.unlock();
371                    }
372            }
373    
374            public List<Runnable> shutdownNow() {
375                    _mainLock.lock();
376    
377                    try {
378                            int state = _runState;
379    
380                            if (state < _STOP) {
381                                    _runState = _STOP;
382                            }
383    
384                            for (WorkerTask workerTask : _workerTasks) {
385                                    workerTask._thread.interrupt();
386                            }
387    
388                            List<Runnable> runnables = new ArrayList<Runnable>();
389    
390                            _taskQueue.drainTo(runnables);
391    
392                            _tryTerminate();
393    
394                            return runnables;
395                    }
396                    finally {
397                            _mainLock.unlock();
398                    }
399            }
400    
401            @Override
402            protected void finalize() {
403                    shutdown();
404            }
405    
406            protected ReentrantLock getMainLock() {
407                    return _mainLock;
408            }
409    
410            protected TaskQueue<Runnable> getTaskQueue() {
411                    return _taskQueue;
412            }
413    
414            protected Set<WorkerTask> getWorkerTasks() {
415                    return _workerTasks;
416            }
417    
418            private void _addWorkerThread() {
419                    int runState = _runState;
420                    int poolSize = _poolSize;
421    
422                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
423                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
424                             !_taskQueue.isEmpty())) {
425    
426                            _mainLock.lock();
427    
428                            try {
429                                    runState = _runState;
430                                    poolSize = _poolSize;
431    
432                                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
433                                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
434                                             !_taskQueue.isEmpty())) {
435    
436                                            Runnable runnable = _taskQueue.poll();
437    
438                                            if (runnable != null) {
439                                                    _doAddWorkerThread(runnable);
440                                            }
441                                    }
442                            }
443                            finally {
444                                    _mainLock.unlock();
445                            }
446                    }
447            }
448    
449            private void _doAddWorkerThread(Runnable runnable) {
450                    WorkerTask workerTask = new WorkerTask(runnable);
451    
452                    _workerTasks.add(workerTask);
453    
454                    int poolSize = ++_poolSize;
455    
456                    if (poolSize > _largestPoolSize) {
457                            _largestPoolSize = poolSize;
458                    }
459    
460                    workerTask._startWork();
461            }
462    
463            private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
464                    while (true) {
465                            try {
466                                    int state = _runState;
467    
468                                    if (state >= _STOP) {
469                                            return null;
470                                    }
471    
472                                    Runnable runnable = null;
473    
474                                    if (state == _SHUTDOWN) {
475                                            runnable = _taskQueue.poll();
476                                    }
477                                    else if ((_poolSize > _corePoolSize) ||
478                                                     _allowCoreThreadTimeout) {
479    
480                                            runnable = _taskQueue.poll(
481                                                    _keepAliveTime, TimeUnit.NANOSECONDS);
482                                    }
483                                    else {
484                                            runnable = _taskQueue.take();
485                                    }
486    
487                                    if (runnable != null) {
488                                            return runnable;
489                                    }
490    
491                                    _mainLock.lock();
492    
493                                    try {
494                                            if ((_runState >= _STOP) ||
495                                                    ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
496                                                    (_allowCoreThreadTimeout &&
497                                                     ((_poolSize > 1) || _taskQueue.isEmpty())) ||
498                                                    (!_allowCoreThreadTimeout &&
499                                                     (_poolSize > _corePoolSize))) {
500    
501                                                    _completedTaskCount +=
502                                                            workerTask._localCompletedTaskCount;
503    
504                                                    _workerTasks.remove(workerTask);
505    
506                                                    if (--_poolSize == 0) {
507                                                            _tryTerminate();
508                                                    }
509    
510                                                    cleanUpMarker[0] = true;
511    
512                                                    return null;
513                                            }
514                                    }
515                                    finally {
516                                            _mainLock.unlock();
517                                    }
518                            }
519                            catch (InterruptedException ie) {
520                            }
521                    }
522            }
523    
524            private void _tryTerminate() {
525                    if (_poolSize == 0) {
526                            int state = _runState;
527    
528                            if ((state == _STOP) ||
529                                    ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
530    
531                                    _runState = _TERMINATED;
532    
533                                    _terminationCondition.signalAll();
534                                    _threadPoolHandler.terminated();
535    
536                                    return;
537                            }
538    
539                            if (!_taskQueue.isEmpty()) {
540                                    _doAddWorkerThread(_taskQueue.poll());
541                            }
542                    }
543            }
544    
545            private static final int _RUNNING = 0;
546    
547            private static final int _SHUTDOWN = 1;
548    
549            private static final int _STOP = 2;
550    
551            private static final int _TERMINATED = 3;
552    
553            private volatile boolean _allowCoreThreadTimeout;
554            private long _completedTaskCount;
555            private volatile int _corePoolSize;
556            private volatile long _keepAliveTime;
557            private volatile int _largestPoolSize;
558            private final ReentrantLock _mainLock = new ReentrantLock();
559            private volatile int _maxPoolSize;
560            private volatile int _poolSize;
561            private volatile RejectedExecutionHandler _rejectedExecutionHandler;
562            private volatile int _runState;
563            private final TaskQueue<Runnable> _taskQueue;
564            private final Condition _terminationCondition = _mainLock.newCondition();
565            private volatile ThreadFactory _threadFactory;
566            private volatile ThreadPoolHandler _threadPoolHandler;
567            private final Set<WorkerTask> _workerTasks;
568    
569            private class WorkerTask
570                    extends AbstractQueuedSynchronizer implements Runnable {
571    
572                    public WorkerTask(Runnable runnable) {
573                            _runnable = runnable;
574                    }
575    
576                    public void run() {
577                            boolean[] cleanUpMarker = new boolean[1];
578    
579                            try {
580                                    Runnable runnable = _runnable;
581    
582                                    _runnable = null;
583    
584                                    do {
585                                            if (runnable != null) {
586                                                    _runTask(runnable);
587    
588                                                    runnable = null;
589                                            }
590                                    }
591                                    while ((runnable = _getTask(this, cleanUpMarker)) != null);
592                            }
593                            finally {
594                                    if (!cleanUpMarker[0]) {
595                                            _mainLock.lock();
596    
597                                            try {
598                                                    _completedTaskCount += _localCompletedTaskCount;
599    
600                                                    _workerTasks.remove(this);
601    
602                                                    if (--_poolSize == 0) {
603                                                            _tryTerminate();
604                                                    }
605                                            }
606                                            finally {
607                                                    _mainLock.unlock();
608                                            }
609                                    }
610    
611                                    _threadPoolHandler.beforeThreadEnd(_thread);
612                            }
613                    }
614    
615                    @Override
616                    protected boolean isHeldExclusively() {
617                            if (getState() == 1) {
618                                    return true;
619                            }
620                            else {
621                                    return false;
622                            }
623                    }
624    
625                    @Override
626                    protected boolean tryAcquire(int unused) {
627                            return compareAndSetState(0, 1);
628                    }
629    
630                    @Override
631                    protected boolean tryRelease(int unused) {
632                            setState(0);
633    
634                            return true;
635                    }
636    
637                    private boolean _interruptIfWaiting() {
638                            if (!_thread.isInterrupted() && tryAcquire(1)) {
639                                    try {
640                                            _thread.interrupt();
641    
642                                            return true;
643                                    }
644                                    finally {
645                                            _unlock();
646                                    }
647                            }
648    
649                            return false;
650                    }
651    
652                    private boolean _isLocked() {
653                            return isHeldExclusively();
654                    }
655    
656                    private void _lock() {
657                            acquire(1);
658                    }
659    
660                    private void _runTask(Runnable task) {
661                            _lock();
662    
663                            try {
664                                    if ((_runState < _STOP) && Thread.interrupted() &&
665                                            (_runState >= _STOP)) {
666    
667                                            _thread.interrupt();
668                                    }
669    
670                                    Throwable throwable = null;
671    
672                                    _threadPoolHandler.beforeExecute(_thread, task);
673    
674                                    try {
675                                            task.run();
676    
677                                            _localCompletedTaskCount++;
678                                    }
679                                    catch (RuntimeException re) {
680                                            throwable = re;
681    
682                                            throw re;
683                                    }
684                                    finally {
685                                            _threadPoolHandler.afterExecute(task, throwable);
686                                    }
687                            }
688                            finally {
689                                    _unlock();
690                            }
691                    }
692    
693                    private void _startWork() {
694                            _thread = _threadFactory.newThread(this);
695    
696                            _threadPoolHandler.beforeThreadStart(_thread);
697    
698                            _thread.start();
699                    }
700    
701                    private void _unlock() {
702                            release(1);
703                    }
704    
705                    private volatile long _localCompletedTaskCount;
706                    private Runnable _runnable;
707                    private Thread _thread;
708    
709            }
710    
711    }