001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
018    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
020    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025    
026    import java.util.Set;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * @author Michael C. Han
031     * @author Shuyang Zhou
032     */
033    public abstract class BaseAsyncDestination extends BaseDestination {
034    
035            public BaseAsyncDestination() {
036            }
037    
038            /**
039             * @deprecated
040             */
041            public BaseAsyncDestination(String name) {
042                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
043            }
044    
045            /**
046             * @deprecated
047             */
048            public BaseAsyncDestination(
049                    String name, int workersCoreSize, int workersMaxSize) {
050    
051                    this.name = name;
052                    _workersCoreSize = workersCoreSize;
053                    _workersMaxSize = workersMaxSize;
054    
055                    open();
056            }
057    
058            @Override
059            public void close(boolean force) {
060                    PortalExecutorManagerUtil.shutdown(getName(), force);
061            }
062    
063            public DestinationStatistics getDestinationStatistics() {
064                    DestinationStatistics destinationStatistics =
065                            new DestinationStatistics();
066    
067                    destinationStatistics.setActiveThreadCount(
068                            _threadPoolExecutor.getActiveCount());
069                    destinationStatistics.setCurrentThreadCount(
070                            _threadPoolExecutor.getPoolSize());
071                    destinationStatistics.setLargestThreadCount(
072                            _threadPoolExecutor.getLargestPoolSize());
073                    destinationStatistics.setMaxThreadPoolSize(
074                            _threadPoolExecutor.getMaxPoolSize());
075                    destinationStatistics.setMinThreadPoolSize(
076                            _threadPoolExecutor.getCorePoolSize());
077                    destinationStatistics.setPendingMessageCount(
078                            _threadPoolExecutor.getPendingTaskCount());
079                    destinationStatistics.setSentMessageCount(
080                            _threadPoolExecutor.getCompletedTaskCount());
081    
082                    return destinationStatistics;
083            }
084    
085            public int getMaximumQueueSize() {
086                    return _maximumQueueSize;
087            }
088    
089            public int getWorkersCoreSize() {
090                    return _workersCoreSize;
091            }
092    
093            public int getWorkersMaxSize() {
094                    return _workersMaxSize;
095            }
096    
097            @Override
098            public void open() {
099                    if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
100                            ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
101    
102                            if (_rejectedExecutionHandler == null) {
103                                    _rejectedExecutionHandler = createRejectionExecutionHandler();
104                            }
105    
106                            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
107                                    _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
108                                    _maximumQueueSize, _rejectedExecutionHandler,
109                                    new NamedThreadFactory(
110                                            getName(), Thread.NORM_PRIORITY, classLoader),
111                                    new ThreadPoolHandlerAdapter());
112    
113                            ThreadPoolExecutor oldThreadPoolExecutor =
114                                    PortalExecutorManagerUtil.registerPortalExecutor(
115                                            getName(), threadPoolExecutor);
116    
117                            if (oldThreadPoolExecutor != null) {
118                                    if (_log.isWarnEnabled()) {
119                                            _log.warn(
120                                                    "Abort creating a new thread pool for destination " +
121                                                            getName() + " and reuse previous one");
122                                    }
123    
124                                    threadPoolExecutor.shutdownNow();
125    
126                                    threadPoolExecutor = oldThreadPoolExecutor;
127                            }
128    
129                            _threadPoolExecutor = threadPoolExecutor;
130                    }
131            }
132    
133            public void send(Message message) {
134                    if (messageListeners.isEmpty()) {
135                            if (_log.isDebugEnabled()) {
136                                    _log.debug("No message listeners for destination " + getName());
137                            }
138    
139                            return;
140                    }
141    
142                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
143    
144                    if (threadPoolExecutor.isShutdown()) {
145                            throw new IllegalStateException(
146                                    "Destination " + getName() + " is shutdown and cannot " +
147                                            "receive more messages");
148                    }
149    
150                    dispatch(messageListeners, message);
151            }
152    
153            public void setMaximumQueueSize(int maximumQueueSize) {
154                    _maximumQueueSize = maximumQueueSize;
155            }
156    
157            public void setRejectedExecutionHandler(
158                    RejectedExecutionHandler rejectedExecutionHandler) {
159    
160                    _rejectedExecutionHandler = rejectedExecutionHandler;
161            }
162    
163            public void setWorkersCoreSize(int workersCoreSize) {
164                    _workersCoreSize = workersCoreSize;
165            }
166    
167            public void setWorkersMaxSize(int workersMaxSize) {
168                    _workersMaxSize = workersMaxSize;
169            }
170    
171            protected RejectedExecutionHandler createRejectionExecutionHandler() {
172                    return new RejectedExecutionHandler() {
173    
174                            public void rejectedExecution(
175                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
176    
177                                    if (!_log.isWarnEnabled()) {
178                                            return;
179                                    }
180    
181                                    MessageRunnable messageRunnable = (MessageRunnable) runnable;
182    
183                                    _log.warn(
184                                            "Discarding message " + messageRunnable.getMessage() +
185                                                    " because it exceeds the maximum queue size of " +
186                                                            _maximumQueueSize);
187                            }
188    
189                    };
190            }
191    
192            protected abstract void dispatch(
193                    Set<MessageListener> messageListeners, Message message);
194    
195            protected ThreadPoolExecutor getThreadPoolExecutor() {
196                    return _threadPoolExecutor;
197            }
198    
199            private static final int _WORKERS_CORE_SIZE = 2;
200    
201            private static final int _WORKERS_MAX_SIZE = 5;
202    
203            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
204    
205            private int _maximumQueueSize = Integer.MAX_VALUE;
206            private RejectedExecutionHandler _rejectedExecutionHandler;
207            private ThreadPoolExecutor _threadPoolExecutor;
208            private int _workersCoreSize = _WORKERS_CORE_SIZE;
209            private int _workersMaxSize = _WORKERS_MAX_SIZE;
210    
211    }