001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025    import com.liferay.portal.kernel.util.NamedThreadFactory;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    import com.liferay.portal.kernel.util.Validator;
028    import com.liferay.portal.model.User;
029    import com.liferay.portal.security.auth.CompanyThreadLocal;
030    import com.liferay.portal.security.auth.PrincipalThreadLocal;
031    import com.liferay.portal.security.permission.PermissionChecker;
032    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
033    import com.liferay.portal.security.permission.PermissionThreadLocal;
034    import com.liferay.portal.service.UserLocalServiceUtil;
035    
036    import java.util.Set;
037    import java.util.concurrent.TimeUnit;
038    
039    /**
040     * @author Michael C. Han
041     * @author Shuyang Zhou
042     */
043    public abstract class BaseAsyncDestination extends BaseDestination {
044    
045            public BaseAsyncDestination() {
046            }
047    
048            /**
049             * @deprecated
050             */
051            public BaseAsyncDestination(String name) {
052                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053            }
054    
055            /**
056             * @deprecated
057             */
058            public BaseAsyncDestination(
059                    String name, int workersCoreSize, int workersMaxSize) {
060    
061                    this.name = name;
062                    _workersCoreSize = workersCoreSize;
063                    _workersMaxSize = workersMaxSize;
064    
065                    open();
066            }
067    
068            @Override
069            public void close(boolean force) {
070                    PortalExecutorManagerUtil.shutdown(getName(), force);
071            }
072    
073            public DestinationStatistics getDestinationStatistics() {
074                    DestinationStatistics destinationStatistics =
075                            new DestinationStatistics();
076    
077                    destinationStatistics.setActiveThreadCount(
078                            _threadPoolExecutor.getActiveCount());
079                    destinationStatistics.setCurrentThreadCount(
080                            _threadPoolExecutor.getPoolSize());
081                    destinationStatistics.setLargestThreadCount(
082                            _threadPoolExecutor.getLargestPoolSize());
083                    destinationStatistics.setMaxThreadPoolSize(
084                            _threadPoolExecutor.getMaxPoolSize());
085                    destinationStatistics.setMinThreadPoolSize(
086                            _threadPoolExecutor.getCorePoolSize());
087                    destinationStatistics.setPendingMessageCount(
088                            _threadPoolExecutor.getPendingTaskCount());
089                    destinationStatistics.setSentMessageCount(
090                            _threadPoolExecutor.getCompletedTaskCount());
091    
092                    return destinationStatistics;
093            }
094    
095            public int getMaximumQueueSize() {
096                    return _maximumQueueSize;
097            }
098    
099            public int getWorkersCoreSize() {
100                    return _workersCoreSize;
101            }
102    
103            public int getWorkersMaxSize() {
104                    return _workersMaxSize;
105            }
106    
107            @Override
108            public void open() {
109                    if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
110                            ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
111    
112                            if (_rejectedExecutionHandler == null) {
113                                    _rejectedExecutionHandler = createRejectionExecutionHandler();
114                            }
115    
116                            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
117                                    _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
118                                    _maximumQueueSize, _rejectedExecutionHandler,
119                                    new NamedThreadFactory(
120                                            getName(), Thread.NORM_PRIORITY, classLoader),
121                                    new ThreadPoolHandlerAdapter());
122    
123                            ThreadPoolExecutor oldThreadPoolExecutor =
124                                    PortalExecutorManagerUtil.registerPortalExecutor(
125                                            getName(), threadPoolExecutor);
126    
127                            if (oldThreadPoolExecutor != null) {
128                                    if (_log.isWarnEnabled()) {
129                                            _log.warn(
130                                                    "Abort creating a new thread pool for destination " +
131                                                            getName() + " and reuse previous one");
132                                    }
133    
134                                    threadPoolExecutor.shutdownNow();
135    
136                                    threadPoolExecutor = oldThreadPoolExecutor;
137                            }
138    
139                            _threadPoolExecutor = threadPoolExecutor;
140                    }
141            }
142    
143            public void send(Message message) {
144                    if (messageListeners.isEmpty()) {
145                            if (_log.isDebugEnabled()) {
146                                    _log.debug("No message listeners for destination " + getName());
147                            }
148    
149                            return;
150                    }
151    
152                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
153    
154                    if (threadPoolExecutor.isShutdown()) {
155                            throw new IllegalStateException(
156                                    "Destination " + getName() + " is shutdown and cannot " +
157                                            "receive more messages");
158                    }
159    
160                    populateMessageFromThreadLocals(message);
161    
162                    dispatch(messageListeners, message);
163            }
164    
165            public void setMaximumQueueSize(int maximumQueueSize) {
166                    _maximumQueueSize = maximumQueueSize;
167            }
168    
169            public void setRejectedExecutionHandler(
170                    RejectedExecutionHandler rejectedExecutionHandler) {
171    
172                    _rejectedExecutionHandler = rejectedExecutionHandler;
173            }
174    
175            public void setWorkersCoreSize(int workersCoreSize) {
176                    _workersCoreSize = workersCoreSize;
177            }
178    
179            public void setWorkersMaxSize(int workersMaxSize) {
180                    _workersMaxSize = workersMaxSize;
181            }
182    
183            protected RejectedExecutionHandler createRejectionExecutionHandler() {
184                    return new RejectedExecutionHandler() {
185    
186                            public void rejectedExecution(
187                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
188    
189                                    if (!_log.isWarnEnabled()) {
190                                            return;
191                                    }
192    
193                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
194    
195                                    _log.warn(
196                                            "Discarding message " + messageRunnable.getMessage() +
197                                                    " because it exceeds the maximum queue size of " +
198                                                            _maximumQueueSize);
199                            }
200    
201                    };
202            }
203    
204            protected abstract void dispatch(
205                    Set<MessageListener> messageListeners, Message message);
206    
207            protected ThreadPoolExecutor getThreadPoolExecutor() {
208                    return _threadPoolExecutor;
209            }
210    
211            protected void populateMessageFromThreadLocals(Message message) {
212                    if (!message.contains("companyId")) {
213                            message.put("companyId", CompanyThreadLocal.getCompanyId());
214                    }
215    
216                    if (!message.contains("permissionChecker")) {
217                            message.put(
218                                    "permissionChecker",
219                                    PermissionThreadLocal.getPermissionChecker());
220                    }
221    
222                    if (!message.contains("principalName")) {
223                            message.put("principalName", PrincipalThreadLocal.getName());
224                    }
225    
226                    if (!message.contains("principalPassword")) {
227                            message.put(
228                                    "principalPassword", PrincipalThreadLocal.getPassword());
229                    }
230            }
231    
232            protected void populateThreadLocalsFromMessage(Message message) {
233                    long companyId = message.getLong("companyId");
234    
235                    if (companyId > 0) {
236                            CompanyThreadLocal.setCompanyId(companyId);
237                    }
238    
239                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
240                            "permissionChecker");
241    
242                    String principalName = message.getString("principalName");
243    
244                    if (Validator.isNotNull(principalName)) {
245                            PrincipalThreadLocal.setName(principalName);
246                    }
247    
248                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
249                            try {
250                                    User user = UserLocalServiceUtil.fetchUser(
251                                            PrincipalThreadLocal.getUserId());
252    
253                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
254                            }
255                            catch (Exception e) {
256                                    throw new RuntimeException(e);
257                            }
258                    }
259    
260                    if (permissionChecker != null) {
261                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
262                    }
263    
264                    String principalPassword = message.getString("principalPassword");
265    
266                    if (Validator.isNotNull(principalPassword)) {
267                            PrincipalThreadLocal.setPassword(principalPassword);
268                    }
269    
270                    Boolean clusterForwardMessage = (Boolean)message.get(
271                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
272    
273                    if (clusterForwardMessage != null) {
274                            MessageValuesThreadLocal.setValue(
275                                    ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
276                    }
277            }
278    
279            private static final int _WORKERS_CORE_SIZE = 2;
280    
281            private static final int _WORKERS_MAX_SIZE = 5;
282    
283            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
284    
285            private int _maximumQueueSize = Integer.MAX_VALUE;
286            private RejectedExecutionHandler _rejectedExecutionHandler;
287            private ThreadPoolExecutor _threadPoolExecutor;
288            private int _workersCoreSize = _WORKERS_CORE_SIZE;
289            private int _workersMaxSize = _WORKERS_MAX_SIZE;
290    
291    }