001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManager;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.util.GroupThreadLocal;
025    import com.liferay.portal.kernel.util.LocaleThreadLocal;
026    import com.liferay.portal.kernel.util.NamedThreadFactory;
027    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
028    import com.liferay.portal.kernel.util.Validator;
029    import com.liferay.portal.model.User;
030    import com.liferay.portal.security.auth.CompanyThreadLocal;
031    import com.liferay.portal.security.auth.PrincipalThreadLocal;
032    import com.liferay.portal.security.permission.PermissionChecker;
033    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
034    import com.liferay.portal.security.permission.PermissionThreadLocal;
035    import com.liferay.portal.service.UserLocalServiceUtil;
036    import com.liferay.registry.Registry;
037    import com.liferay.registry.RegistryUtil;
038    import com.liferay.registry.ServiceReference;
039    import com.liferay.registry.ServiceTracker;
040    import com.liferay.registry.ServiceTrackerCustomizer;
041    
042    import java.util.Locale;
043    import java.util.Set;
044    import java.util.concurrent.TimeUnit;
045    
046    /**
047     * @author Michael C. Han
048     * @author Shuyang Zhou
049     */
050    public abstract class BaseAsyncDestination extends BaseDestination {
051    
052            @Override
053            public void afterPropertiesSet() {
054                    super.afterPropertiesSet();
055    
056                    Registry registry = RegistryUtil.getRegistry();
057    
058                    serviceTracker = registry.trackServices(
059                            PortalExecutorManager.class,
060                            new PortalExecutorManagerServiceTrackerCustomizer());
061    
062                    serviceTracker.open();
063            }
064    
065            @Override
066            public void close(boolean force) {
067                    if (portalExecutorManager == null) {
068                            return;
069                    }
070    
071                    ThreadPoolExecutor threadPoolExecutor =
072                            portalExecutorManager.getPortalExecutor(getName());
073    
074                    if (force) {
075                            threadPoolExecutor.shutdownNow();
076                    }
077                    else {
078                            threadPoolExecutor.shutdown();
079                    }
080            }
081    
082            @Override
083            public void destroy() {
084                    super.destroy();
085    
086                    serviceTracker.close();
087            }
088    
089            @Override
090            public DestinationStatistics getDestinationStatistics() {
091                    DestinationStatistics destinationStatistics =
092                            new DestinationStatistics();
093    
094                    destinationStatistics.setActiveThreadCount(
095                            _threadPoolExecutor.getActiveCount());
096                    destinationStatistics.setCurrentThreadCount(
097                            _threadPoolExecutor.getPoolSize());
098                    destinationStatistics.setLargestThreadCount(
099                            _threadPoolExecutor.getLargestPoolSize());
100                    destinationStatistics.setMaxThreadPoolSize(
101                            _threadPoolExecutor.getMaxPoolSize());
102                    destinationStatistics.setMinThreadPoolSize(
103                            _threadPoolExecutor.getCorePoolSize());
104                    destinationStatistics.setPendingMessageCount(
105                            _threadPoolExecutor.getPendingTaskCount());
106                    destinationStatistics.setSentMessageCount(
107                            _threadPoolExecutor.getCompletedTaskCount());
108    
109                    return destinationStatistics;
110            }
111    
112            public int getMaximumQueueSize() {
113                    return _maximumQueueSize;
114            }
115    
116            public int getWorkersCoreSize() {
117                    return _workersCoreSize;
118            }
119    
120            public int getWorkersMaxSize() {
121                    return _workersMaxSize;
122            }
123    
124            @Override
125            public void open() {
126                    if ((_threadPoolExecutor != null) &&
127                            !_threadPoolExecutor.isShutdown()) {
128    
129                            return;
130                    }
131    
132                    ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
133    
134                    if (_rejectedExecutionHandler == null) {
135                            _rejectedExecutionHandler = createRejectionExecutionHandler();
136                    }
137    
138                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
139                            _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
140                            _maximumQueueSize, _rejectedExecutionHandler,
141                            new NamedThreadFactory(
142                                    getName(), Thread.NORM_PRIORITY, classLoader),
143                            new ThreadPoolHandlerAdapter());
144    
145                    ThreadPoolExecutor oldThreadPoolExecutor =
146                            portalExecutorManager.registerPortalExecutor(
147                                    getName(), threadPoolExecutor);
148    
149                    if (oldThreadPoolExecutor != null) {
150                            if (_log.isWarnEnabled()) {
151                                    _log.warn(
152                                            "Abort creating a new thread pool for destination " +
153                                                    getName() + " and reuse previous one");
154                            }
155    
156                            threadPoolExecutor.shutdownNow();
157    
158                            threadPoolExecutor = oldThreadPoolExecutor;
159                    }
160    
161                    _threadPoolExecutor = threadPoolExecutor;
162            }
163    
164            @Override
165            public void send(Message message) {
166                    if (messageListeners.isEmpty()) {
167                            if (_log.isDebugEnabled()) {
168                                    _log.debug("No message listeners for destination " + getName());
169                            }
170    
171                            return;
172                    }
173    
174                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
175    
176                    if (threadPoolExecutor.isShutdown()) {
177                            throw new IllegalStateException(
178                                    "Destination " + getName() + " is shutdown and cannot " +
179                                            "receive more messages");
180                    }
181    
182                    populateMessageFromThreadLocals(message);
183    
184                    if (_log.isDebugEnabled()) {
185                            _log.debug(
186                                    "Sending message " + message + " from destination " +
187                                            getName() + " to message listeners " + messageListeners);
188                    }
189    
190                    dispatch(messageListeners, message);
191            }
192    
193            public void setMaximumQueueSize(int maximumQueueSize) {
194                    _maximumQueueSize = maximumQueueSize;
195            }
196    
197            public void setRejectedExecutionHandler(
198                    RejectedExecutionHandler rejectedExecutionHandler) {
199    
200                    _rejectedExecutionHandler = rejectedExecutionHandler;
201            }
202    
203            public void setWorkersCoreSize(int workersCoreSize) {
204                    _workersCoreSize = workersCoreSize;
205            }
206    
207            public void setWorkersMaxSize(int workersMaxSize) {
208                    _workersMaxSize = workersMaxSize;
209            }
210    
211            protected RejectedExecutionHandler createRejectionExecutionHandler() {
212                    return new RejectedExecutionHandler() {
213    
214                            @Override
215                            public void rejectedExecution(
216                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
217    
218                                    if (!_log.isWarnEnabled()) {
219                                            return;
220                                    }
221    
222                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
223    
224                                    _log.warn(
225                                            "Discarding message " + messageRunnable.getMessage() +
226                                                    " because it exceeds the maximum queue size of " +
227                                                            _maximumQueueSize);
228                            }
229    
230                    };
231            }
232    
233            protected abstract void dispatch(
234                    Set<MessageListener> messageListeners, Message message);
235    
236            protected ThreadPoolExecutor getThreadPoolExecutor() {
237                    return _threadPoolExecutor;
238            }
239    
240            protected void populateMessageFromThreadLocals(Message message) {
241                    if (!message.contains("companyId")) {
242                            message.put("companyId", CompanyThreadLocal.getCompanyId());
243                    }
244    
245                    if (!ClusterInvokeThreadLocal.isEnabled()) {
246                            message.put("clusterInvoke", Boolean.FALSE);
247                    }
248    
249                    if (!message.contains("defaultLocale")) {
250                            message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
251                    }
252    
253                    if (!message.contains("groupId")) {
254                            message.put("groupId", GroupThreadLocal.getGroupId());
255                    }
256    
257                    if (!message.contains("permissionChecker")) {
258                            message.put(
259                                    "permissionChecker",
260                                    PermissionThreadLocal.getPermissionChecker());
261                    }
262    
263                    if (!message.contains("principalName")) {
264                            message.put("principalName", PrincipalThreadLocal.getName());
265                    }
266    
267                    if (!message.contains("principalPassword")) {
268                            message.put(
269                                    "principalPassword", PrincipalThreadLocal.getPassword());
270                    }
271    
272                    if (!message.contains("siteDefaultLocale")) {
273                            message.put(
274                                    "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
275                    }
276    
277                    if (!message.contains("themeDisplayLocale")) {
278                            message.put(
279                                    "themeDisplayLocale",
280                                    LocaleThreadLocal.getThemeDisplayLocale());
281                    }
282            }
283    
284            protected void populateThreadLocalsFromMessage(Message message) {
285                    long companyId = message.getLong("companyId");
286    
287                    if (companyId > 0) {
288                            CompanyThreadLocal.setCompanyId(companyId);
289                    }
290    
291                    Boolean clusterInvoke = (Boolean)message.get("clusterInvoke");
292    
293                    if (clusterInvoke != null) {
294                            ClusterInvokeThreadLocal.setEnabled(clusterInvoke);
295                    }
296    
297                    Locale defaultLocale = (Locale)message.get("defaultLocale");
298    
299                    if (defaultLocale != null) {
300                            LocaleThreadLocal.setDefaultLocale(defaultLocale);
301                    }
302    
303                    long groupId = message.getLong("groupId");
304    
305                    if (groupId > 0) {
306                            GroupThreadLocal.setGroupId(groupId);
307                    }
308    
309                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
310                            "permissionChecker");
311    
312                    String principalName = message.getString("principalName");
313    
314                    if (Validator.isNotNull(principalName)) {
315                            PrincipalThreadLocal.setName(principalName);
316                    }
317    
318                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
319                            try {
320                                    User user = UserLocalServiceUtil.fetchUser(
321                                            PrincipalThreadLocal.getUserId());
322    
323                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
324                            }
325                            catch (Exception e) {
326                                    throw new RuntimeException(e);
327                            }
328                    }
329    
330                    if (permissionChecker != null) {
331                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
332                    }
333    
334                    String principalPassword = message.getString("principalPassword");
335    
336                    if (Validator.isNotNull(principalPassword)) {
337                            PrincipalThreadLocal.setPassword(principalPassword);
338                    }
339    
340                    Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
341    
342                    if (siteDefaultLocale != null) {
343                            LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
344                    }
345    
346                    Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
347    
348                    if (themeDisplayLocale != null) {
349                            LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
350                    }
351            }
352    
353            protected volatile PortalExecutorManager portalExecutorManager;
354            protected ServiceTracker<PortalExecutorManager, PortalExecutorManager>
355                    serviceTracker;
356    
357            private static final int _WORKERS_CORE_SIZE = 2;
358    
359            private static final int _WORKERS_MAX_SIZE = 5;
360    
361            private static final Log _log = LogFactoryUtil.getLog(
362                    BaseAsyncDestination.class);
363    
364            private int _maximumQueueSize = Integer.MAX_VALUE;
365            private RejectedExecutionHandler _rejectedExecutionHandler;
366            private ThreadPoolExecutor _threadPoolExecutor;
367            private int _workersCoreSize = _WORKERS_CORE_SIZE;
368            private int _workersMaxSize = _WORKERS_MAX_SIZE;
369    
370            private class PortalExecutorManagerServiceTrackerCustomizer
371                    implements ServiceTrackerCustomizer
372                            <PortalExecutorManager, PortalExecutorManager> {
373    
374                    @Override
375                    public PortalExecutorManager addingService(
376                            ServiceReference<PortalExecutorManager> serviceReference) {
377    
378                            Registry registry = RegistryUtil.getRegistry();
379    
380                            portalExecutorManager = registry.getService(serviceReference);
381    
382                            open();
383    
384                            return portalExecutorManager;
385                    }
386    
387                    @Override
388                    public void modifiedService(
389                            ServiceReference<PortalExecutorManager> serviceReference,
390                            PortalExecutorManager portalExecutorManager) {
391                    }
392    
393                    @Override
394                    public void removedService(
395                            ServiceReference<PortalExecutorManager> serviceReference,
396                            PortalExecutorManager portalExecutorManager) {
397    
398                            portalExecutorManager = null;
399                    }
400    
401            }
402    
403    }