001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterLink;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025    import com.liferay.portal.kernel.util.GroupThreadLocal;
026    import com.liferay.portal.kernel.util.LocaleThreadLocal;
027    import com.liferay.portal.kernel.util.NamedThreadFactory;
028    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029    import com.liferay.portal.kernel.util.Validator;
030    import com.liferay.portal.model.User;
031    import com.liferay.portal.security.auth.CompanyThreadLocal;
032    import com.liferay.portal.security.auth.PrincipalThreadLocal;
033    import com.liferay.portal.security.permission.PermissionChecker;
034    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
035    import com.liferay.portal.security.permission.PermissionThreadLocal;
036    import com.liferay.portal.service.UserLocalServiceUtil;
037    
038    import java.util.Locale;
039    import java.util.Set;
040    import java.util.concurrent.TimeUnit;
041    
042    /**
043     * @author Michael C. Han
044     * @author Shuyang Zhou
045     */
046    public abstract class BaseAsyncDestination extends BaseDestination {
047    
048            public BaseAsyncDestination() {
049            }
050    
051            /**
052             * @deprecated As of 6.1.0
053             */
054            public BaseAsyncDestination(String name) {
055                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
056            }
057    
058            /**
059             * @deprecated As of 6.1.0
060             */
061            public BaseAsyncDestination(
062                    String name, int workersCoreSize, int workersMaxSize) {
063    
064                    this.name = name;
065                    _workersCoreSize = workersCoreSize;
066                    _workersMaxSize = workersMaxSize;
067    
068                    open();
069            }
070    
071            @Override
072            public void close(boolean force) {
073                    PortalExecutorManagerUtil.shutdown(getName(), force);
074            }
075    
076            @Override
077            public DestinationStatistics getDestinationStatistics() {
078                    DestinationStatistics destinationStatistics =
079                            new DestinationStatistics();
080    
081                    destinationStatistics.setActiveThreadCount(
082                            _threadPoolExecutor.getActiveCount());
083                    destinationStatistics.setCurrentThreadCount(
084                            _threadPoolExecutor.getPoolSize());
085                    destinationStatistics.setLargestThreadCount(
086                            _threadPoolExecutor.getLargestPoolSize());
087                    destinationStatistics.setMaxThreadPoolSize(
088                            _threadPoolExecutor.getMaxPoolSize());
089                    destinationStatistics.setMinThreadPoolSize(
090                            _threadPoolExecutor.getCorePoolSize());
091                    destinationStatistics.setPendingMessageCount(
092                            _threadPoolExecutor.getPendingTaskCount());
093                    destinationStatistics.setSentMessageCount(
094                            _threadPoolExecutor.getCompletedTaskCount());
095    
096                    return destinationStatistics;
097            }
098    
099            public int getMaximumQueueSize() {
100                    return _maximumQueueSize;
101            }
102    
103            public int getWorkersCoreSize() {
104                    return _workersCoreSize;
105            }
106    
107            public int getWorkersMaxSize() {
108                    return _workersMaxSize;
109            }
110    
111            @Override
112            public void open() {
113                    if ((_threadPoolExecutor != null) &&
114                            !_threadPoolExecutor.isShutdown()) {
115    
116                            return;
117                    }
118    
119                    ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
120    
121                    if (_rejectedExecutionHandler == null) {
122                            _rejectedExecutionHandler = createRejectionExecutionHandler();
123                    }
124    
125                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
126                            _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
127                            _maximumQueueSize, _rejectedExecutionHandler,
128                            new NamedThreadFactory(
129                                    getName(), Thread.NORM_PRIORITY, classLoader),
130                            new ThreadPoolHandlerAdapter());
131    
132                    ThreadPoolExecutor oldThreadPoolExecutor =
133                            PortalExecutorManagerUtil.registerPortalExecutor(
134                                    getName(), threadPoolExecutor);
135    
136                    if (oldThreadPoolExecutor != null) {
137                            if (_log.isWarnEnabled()) {
138                                    _log.warn(
139                                            "Abort creating a new thread pool for destination " +
140                                                    getName() + " and reuse previous one");
141                            }
142    
143                            threadPoolExecutor.shutdownNow();
144    
145                            threadPoolExecutor = oldThreadPoolExecutor;
146                    }
147    
148                    _threadPoolExecutor = threadPoolExecutor;
149            }
150    
151            @Override
152            public void send(Message message) {
153                    if (messageListeners.isEmpty()) {
154                            if (_log.isDebugEnabled()) {
155                                    _log.debug("No message listeners for destination " + getName());
156                            }
157    
158                            return;
159                    }
160    
161                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
162    
163                    if (threadPoolExecutor.isShutdown()) {
164                            throw new IllegalStateException(
165                                    "Destination " + getName() + " is shutdown and cannot " +
166                                            "receive more messages");
167                    }
168    
169                    populateMessageFromThreadLocals(message);
170    
171                    dispatch(messageListeners, message);
172            }
173    
174            public void setMaximumQueueSize(int maximumQueueSize) {
175                    _maximumQueueSize = maximumQueueSize;
176            }
177    
178            public void setRejectedExecutionHandler(
179                    RejectedExecutionHandler rejectedExecutionHandler) {
180    
181                    _rejectedExecutionHandler = rejectedExecutionHandler;
182            }
183    
184            public void setWorkersCoreSize(int workersCoreSize) {
185                    _workersCoreSize = workersCoreSize;
186            }
187    
188            public void setWorkersMaxSize(int workersMaxSize) {
189                    _workersMaxSize = workersMaxSize;
190            }
191    
192            protected RejectedExecutionHandler createRejectionExecutionHandler() {
193                    return new RejectedExecutionHandler() {
194    
195                            @Override
196                            public void rejectedExecution(
197                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
198    
199                                    if (!_log.isWarnEnabled()) {
200                                            return;
201                                    }
202    
203                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
204    
205                                    _log.warn(
206                                            "Discarding message " + messageRunnable.getMessage() +
207                                                    " because it exceeds the maximum queue size of " +
208                                                            _maximumQueueSize);
209                            }
210    
211                    };
212            }
213    
214            protected abstract void dispatch(
215                    Set<MessageListener> messageListeners, Message message);
216    
217            protected ThreadPoolExecutor getThreadPoolExecutor() {
218                    return _threadPoolExecutor;
219            }
220    
221            protected void populateMessageFromThreadLocals(Message message) {
222                    if (!message.contains("companyId")) {
223                            message.put("companyId", CompanyThreadLocal.getCompanyId());
224                    }
225    
226                    if (!message.contains("defaultLocale")) {
227                            message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
228                    }
229    
230                    if (!message.contains("groupId")) {
231                            message.put("groupId", GroupThreadLocal.getGroupId());
232                    }
233    
234                    if (!message.contains("permissionChecker")) {
235                            message.put(
236                                    "permissionChecker",
237                                    PermissionThreadLocal.getPermissionChecker());
238                    }
239    
240                    if (!message.contains("principalName")) {
241                            message.put("principalName", PrincipalThreadLocal.getName());
242                    }
243    
244                    if (!message.contains("principalPassword")) {
245                            message.put(
246                                    "principalPassword", PrincipalThreadLocal.getPassword());
247                    }
248    
249                    if (!message.contains("siteDefaultLocale")) {
250                            message.put(
251                                    "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
252                    }
253    
254                    if (!message.contains("themeDisplayLocale")) {
255                            message.put(
256                                    "themeDisplayLocale",
257                                    LocaleThreadLocal.getThemeDisplayLocale());
258                    }
259            }
260    
261            protected void populateThreadLocalsFromMessage(Message message) {
262                    long companyId = message.getLong("companyId");
263    
264                    if (companyId > 0) {
265                            CompanyThreadLocal.setCompanyId(companyId);
266                    }
267    
268                    Locale defaultLocale = (Locale)message.get("defaultLocale");
269    
270                    if (defaultLocale != null) {
271                            LocaleThreadLocal.setDefaultLocale(defaultLocale);
272                    }
273    
274                    long groupId = message.getLong("groupId");
275    
276                    if (groupId > 0) {
277                            GroupThreadLocal.setGroupId(groupId);
278                    }
279    
280                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
281                            "permissionChecker");
282    
283                    String principalName = message.getString("principalName");
284    
285                    if (Validator.isNotNull(principalName)) {
286                            PrincipalThreadLocal.setName(principalName);
287                    }
288    
289                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
290                            try {
291                                    User user = UserLocalServiceUtil.fetchUser(
292                                            PrincipalThreadLocal.getUserId());
293    
294                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
295                            }
296                            catch (Exception e) {
297                                    throw new RuntimeException(e);
298                            }
299                    }
300    
301                    if (permissionChecker != null) {
302                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
303                    }
304    
305                    String principalPassword = message.getString("principalPassword");
306    
307                    if (Validator.isNotNull(principalPassword)) {
308                            PrincipalThreadLocal.setPassword(principalPassword);
309                    }
310    
311                    Boolean clusterForwardMessage = (Boolean)message.get(
312                            ClusterLink.CLUSTER_FORWARD_MESSAGE);
313    
314                    if (clusterForwardMessage != null) {
315                            MessageValuesThreadLocal.setValue(
316                                    ClusterLink.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
317                    }
318    
319                    Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
320    
321                    if (siteDefaultLocale != null) {
322                            LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
323                    }
324    
325                    Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
326    
327                    if (themeDisplayLocale != null) {
328                            LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
329                    }
330            }
331    
332            private static final int _WORKERS_CORE_SIZE = 2;
333    
334            private static final int _WORKERS_MAX_SIZE = 5;
335    
336            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
337    
338            private int _maximumQueueSize = Integer.MAX_VALUE;
339            private RejectedExecutionHandler _rejectedExecutionHandler;
340            private ThreadPoolExecutor _threadPoolExecutor;
341            private int _workersCoreSize = _WORKERS_CORE_SIZE;
342            private int _workersMaxSize = _WORKERS_MAX_SIZE;
343    
344    }