001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterLink;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025    import com.liferay.portal.kernel.util.GroupThreadLocal;
026    import com.liferay.portal.kernel.util.LocaleThreadLocal;
027    import com.liferay.portal.kernel.util.NamedThreadFactory;
028    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029    import com.liferay.portal.kernel.util.Validator;
030    import com.liferay.portal.model.User;
031    import com.liferay.portal.security.auth.CompanyThreadLocal;
032    import com.liferay.portal.security.auth.PrincipalThreadLocal;
033    import com.liferay.portal.security.permission.PermissionChecker;
034    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
035    import com.liferay.portal.security.permission.PermissionThreadLocal;
036    import com.liferay.portal.service.UserLocalServiceUtil;
037    
038    import java.util.Locale;
039    import java.util.Set;
040    import java.util.concurrent.TimeUnit;
041    
042    /**
043     * @author Michael C. Han
044     * @author Shuyang Zhou
045     */
046    public abstract class BaseAsyncDestination extends BaseDestination {
047    
048            public BaseAsyncDestination() {
049            }
050    
051            /**
052             * @deprecated As of 6.1.0
053             */
054            @Deprecated
055            public BaseAsyncDestination(String name) {
056                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
057            }
058    
059            /**
060             * @deprecated As of 6.1.0
061             */
062            @Deprecated
063            public BaseAsyncDestination(
064                    String name, int workersCoreSize, int workersMaxSize) {
065    
066                    this.name = name;
067                    _workersCoreSize = workersCoreSize;
068                    _workersMaxSize = workersMaxSize;
069    
070                    open();
071            }
072    
073            @Override
074            public void close(boolean force) {
075                    PortalExecutorManagerUtil.shutdown(getName(), force);
076            }
077    
078            @Override
079            public DestinationStatistics getDestinationStatistics() {
080                    DestinationStatistics destinationStatistics =
081                            new DestinationStatistics();
082    
083                    destinationStatistics.setActiveThreadCount(
084                            _threadPoolExecutor.getActiveCount());
085                    destinationStatistics.setCurrentThreadCount(
086                            _threadPoolExecutor.getPoolSize());
087                    destinationStatistics.setLargestThreadCount(
088                            _threadPoolExecutor.getLargestPoolSize());
089                    destinationStatistics.setMaxThreadPoolSize(
090                            _threadPoolExecutor.getMaxPoolSize());
091                    destinationStatistics.setMinThreadPoolSize(
092                            _threadPoolExecutor.getCorePoolSize());
093                    destinationStatistics.setPendingMessageCount(
094                            _threadPoolExecutor.getPendingTaskCount());
095                    destinationStatistics.setSentMessageCount(
096                            _threadPoolExecutor.getCompletedTaskCount());
097    
098                    return destinationStatistics;
099            }
100    
101            public int getMaximumQueueSize() {
102                    return _maximumQueueSize;
103            }
104    
105            public int getWorkersCoreSize() {
106                    return _workersCoreSize;
107            }
108    
109            public int getWorkersMaxSize() {
110                    return _workersMaxSize;
111            }
112    
113            @Override
114            public void open() {
115                    if ((_threadPoolExecutor != null) &&
116                            !_threadPoolExecutor.isShutdown()) {
117    
118                            return;
119                    }
120    
121                    ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
122    
123                    if (_rejectedExecutionHandler == null) {
124                            _rejectedExecutionHandler = createRejectionExecutionHandler();
125                    }
126    
127                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
128                            _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
129                            _maximumQueueSize, _rejectedExecutionHandler,
130                            new NamedThreadFactory(
131                                    getName(), Thread.NORM_PRIORITY, classLoader),
132                            new ThreadPoolHandlerAdapter());
133    
134                    ThreadPoolExecutor oldThreadPoolExecutor =
135                            PortalExecutorManagerUtil.registerPortalExecutor(
136                                    getName(), threadPoolExecutor);
137    
138                    if (oldThreadPoolExecutor != null) {
139                            if (_log.isWarnEnabled()) {
140                                    _log.warn(
141                                            "Abort creating a new thread pool for destination " +
142                                                    getName() + " and reuse previous one");
143                            }
144    
145                            threadPoolExecutor.shutdownNow();
146    
147                            threadPoolExecutor = oldThreadPoolExecutor;
148                    }
149    
150                    _threadPoolExecutor = threadPoolExecutor;
151            }
152    
153            @Override
154            public void send(Message message) {
155                    if (messageListeners.isEmpty()) {
156                            if (_log.isDebugEnabled()) {
157                                    _log.debug("No message listeners for destination " + getName());
158                            }
159    
160                            return;
161                    }
162    
163                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
164    
165                    if (threadPoolExecutor.isShutdown()) {
166                            throw new IllegalStateException(
167                                    "Destination " + getName() + " is shutdown and cannot " +
168                                            "receive more messages");
169                    }
170    
171                    populateMessageFromThreadLocals(message);
172    
173                    dispatch(messageListeners, message);
174            }
175    
176            public void setMaximumQueueSize(int maximumQueueSize) {
177                    _maximumQueueSize = maximumQueueSize;
178            }
179    
180            public void setRejectedExecutionHandler(
181                    RejectedExecutionHandler rejectedExecutionHandler) {
182    
183                    _rejectedExecutionHandler = rejectedExecutionHandler;
184            }
185    
186            public void setWorkersCoreSize(int workersCoreSize) {
187                    _workersCoreSize = workersCoreSize;
188            }
189    
190            public void setWorkersMaxSize(int workersMaxSize) {
191                    _workersMaxSize = workersMaxSize;
192            }
193    
194            protected RejectedExecutionHandler createRejectionExecutionHandler() {
195                    return new RejectedExecutionHandler() {
196    
197                            @Override
198                            public void rejectedExecution(
199                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
200    
201                                    if (!_log.isWarnEnabled()) {
202                                            return;
203                                    }
204    
205                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
206    
207                                    _log.warn(
208                                            "Discarding message " + messageRunnable.getMessage() +
209                                                    " because it exceeds the maximum queue size of " +
210                                                            _maximumQueueSize);
211                            }
212    
213                    };
214            }
215    
216            protected abstract void dispatch(
217                    Set<MessageListener> messageListeners, Message message);
218    
219            protected ThreadPoolExecutor getThreadPoolExecutor() {
220                    return _threadPoolExecutor;
221            }
222    
223            protected void populateMessageFromThreadLocals(Message message) {
224                    if (!message.contains("companyId")) {
225                            message.put("companyId", CompanyThreadLocal.getCompanyId());
226                    }
227    
228                    if (!message.contains("defaultLocale")) {
229                            message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
230                    }
231    
232                    if (!message.contains("groupId")) {
233                            message.put("groupId", GroupThreadLocal.getGroupId());
234                    }
235    
236                    if (!message.contains("permissionChecker")) {
237                            message.put(
238                                    "permissionChecker",
239                                    PermissionThreadLocal.getPermissionChecker());
240                    }
241    
242                    if (!message.contains("principalName")) {
243                            message.put("principalName", PrincipalThreadLocal.getName());
244                    }
245    
246                    if (!message.contains("principalPassword")) {
247                            message.put(
248                                    "principalPassword", PrincipalThreadLocal.getPassword());
249                    }
250    
251                    if (!message.contains("siteDefaultLocale")) {
252                            message.put(
253                                    "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
254                    }
255    
256                    if (!message.contains("themeDisplayLocale")) {
257                            message.put(
258                                    "themeDisplayLocale",
259                                    LocaleThreadLocal.getThemeDisplayLocale());
260                    }
261            }
262    
263            protected void populateThreadLocalsFromMessage(Message message) {
264                    long companyId = message.getLong("companyId");
265    
266                    if (companyId > 0) {
267                            CompanyThreadLocal.setCompanyId(companyId);
268                    }
269    
270                    Locale defaultLocale = (Locale)message.get("defaultLocale");
271    
272                    if (defaultLocale != null) {
273                            LocaleThreadLocal.setDefaultLocale(defaultLocale);
274                    }
275    
276                    long groupId = message.getLong("groupId");
277    
278                    if (groupId > 0) {
279                            GroupThreadLocal.setGroupId(groupId);
280                    }
281    
282                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
283                            "permissionChecker");
284    
285                    String principalName = message.getString("principalName");
286    
287                    if (Validator.isNotNull(principalName)) {
288                            PrincipalThreadLocal.setName(principalName);
289                    }
290    
291                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
292                            try {
293                                    User user = UserLocalServiceUtil.fetchUser(
294                                            PrincipalThreadLocal.getUserId());
295    
296                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
297                            }
298                            catch (Exception e) {
299                                    throw new RuntimeException(e);
300                            }
301                    }
302    
303                    if (permissionChecker != null) {
304                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
305                    }
306    
307                    String principalPassword = message.getString("principalPassword");
308    
309                    if (Validator.isNotNull(principalPassword)) {
310                            PrincipalThreadLocal.setPassword(principalPassword);
311                    }
312    
313                    Boolean clusterForwardMessage = (Boolean)message.get(
314                            ClusterLink.CLUSTER_FORWARD_MESSAGE);
315    
316                    if (clusterForwardMessage != null) {
317                            MessageValuesThreadLocal.setValue(
318                                    ClusterLink.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
319                    }
320    
321                    Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
322    
323                    if (siteDefaultLocale != null) {
324                            LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
325                    }
326    
327                    Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
328    
329                    if (themeDisplayLocale != null) {
330                            LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
331                    }
332            }
333    
334            private static final int _WORKERS_CORE_SIZE = 2;
335    
336            private static final int _WORKERS_MAX_SIZE = 5;
337    
338            private static final Log _log = LogFactoryUtil.getLog(
339                    BaseAsyncDestination.class);
340    
341            private int _maximumQueueSize = Integer.MAX_VALUE;
342            private RejectedExecutionHandler _rejectedExecutionHandler;
343            private ThreadPoolExecutor _threadPoolExecutor;
344            private int _workersCoreSize = _WORKERS_CORE_SIZE;
345            private int _workersMaxSize = _WORKERS_MAX_SIZE;
346    
347    }