001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterLink;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025    import com.liferay.portal.kernel.util.GroupThreadLocal;
026    import com.liferay.portal.kernel.util.LocaleThreadLocal;
027    import com.liferay.portal.kernel.util.NamedThreadFactory;
028    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029    import com.liferay.portal.kernel.util.Validator;
030    import com.liferay.portal.model.User;
031    import com.liferay.portal.security.auth.CompanyThreadLocal;
032    import com.liferay.portal.security.auth.PrincipalThreadLocal;
033    import com.liferay.portal.security.permission.PermissionChecker;
034    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
035    import com.liferay.portal.security.permission.PermissionThreadLocal;
036    import com.liferay.portal.service.UserLocalServiceUtil;
037    
038    import java.util.Locale;
039    import java.util.Set;
040    import java.util.concurrent.TimeUnit;
041    
042    /**
043     * @author Michael C. Han
044     * @author Shuyang Zhou
045     */
046    public abstract class BaseAsyncDestination extends BaseDestination {
047    
048            public BaseAsyncDestination() {
049            }
050    
051            /**
052             * @deprecated As of 6.1.0
053             */
054            public BaseAsyncDestination(String name) {
055                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
056            }
057    
058            /**
059             * @deprecated As of 6.1.0
060             */
061            public BaseAsyncDestination(
062                    String name, int workersCoreSize, int workersMaxSize) {
063    
064                    this.name = name;
065                    _workersCoreSize = workersCoreSize;
066                    _workersMaxSize = workersMaxSize;
067    
068                    open();
069            }
070    
071            @Override
072            public void close(boolean force) {
073                    PortalExecutorManagerUtil.shutdown(getName(), force);
074            }
075    
076            @Override
077            public DestinationStatistics getDestinationStatistics() {
078                    DestinationStatistics destinationStatistics =
079                            new DestinationStatistics();
080    
081                    destinationStatistics.setActiveThreadCount(
082                            _threadPoolExecutor.getActiveCount());
083                    destinationStatistics.setCurrentThreadCount(
084                            _threadPoolExecutor.getPoolSize());
085                    destinationStatistics.setLargestThreadCount(
086                            _threadPoolExecutor.getLargestPoolSize());
087                    destinationStatistics.setMaxThreadPoolSize(
088                            _threadPoolExecutor.getMaxPoolSize());
089                    destinationStatistics.setMinThreadPoolSize(
090                            _threadPoolExecutor.getCorePoolSize());
091                    destinationStatistics.setPendingMessageCount(
092                            _threadPoolExecutor.getPendingTaskCount());
093                    destinationStatistics.setSentMessageCount(
094                            _threadPoolExecutor.getCompletedTaskCount());
095    
096                    return destinationStatistics;
097            }
098    
099            public int getMaximumQueueSize() {
100                    return _maximumQueueSize;
101            }
102    
103            public int getWorkersCoreSize() {
104                    return _workersCoreSize;
105            }
106    
107            public int getWorkersMaxSize() {
108                    return _workersMaxSize;
109            }
110    
111            @Override
112            public void open() {
113                    if ((_threadPoolExecutor != null) &&
114                            !_threadPoolExecutor.isShutdown()) {
115    
116                            return;
117                    }
118    
119                    ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
120    
121                    if (_rejectedExecutionHandler == null) {
122                            _rejectedExecutionHandler = createRejectionExecutionHandler();
123                    }
124    
125                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
126                            _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
127                            _maximumQueueSize, _rejectedExecutionHandler,
128                            new NamedThreadFactory(
129                                    getName(), Thread.NORM_PRIORITY, classLoader),
130                            new ThreadPoolHandlerAdapter());
131    
132                    ThreadPoolExecutor oldThreadPoolExecutor =
133                            PortalExecutorManagerUtil.registerPortalExecutor(
134                                    getName(), threadPoolExecutor);
135    
136                    if (oldThreadPoolExecutor != null) {
137                            if (_log.isWarnEnabled()) {
138                                    _log.warn(
139                                            "Abort creating a new thread pool for destination " +
140                                                    getName() + " and reuse previous one");
141                            }
142    
143                            threadPoolExecutor.shutdownNow();
144    
145                            threadPoolExecutor = oldThreadPoolExecutor;
146                    }
147    
148                    _threadPoolExecutor = threadPoolExecutor;
149            }
150    
151            @Override
152            public void send(Message message) {
153                    if (messageListeners.isEmpty()) {
154                            if (_log.isDebugEnabled()) {
155                                    _log.debug("No message listeners for destination " + getName());
156                            }
157    
158                            return;
159                    }
160    
161                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
162    
163                    if (threadPoolExecutor.isShutdown()) {
164                            throw new IllegalStateException(
165                                    "Destination " + getName() + " is shutdown and cannot " +
166                                            "receive more messages");
167                    }
168    
169                    populateMessageFromThreadLocals(message);
170    
171                    if (_log.isDebugEnabled()) {
172                            _log.debug(
173                                    "Sending message " + message + " from destination " +
174                                            getName() + " to message listeners " + messageListeners);
175                    }
176    
177                    dispatch(messageListeners, message);
178            }
179    
180            public void setMaximumQueueSize(int maximumQueueSize) {
181                    _maximumQueueSize = maximumQueueSize;
182            }
183    
184            public void setRejectedExecutionHandler(
185                    RejectedExecutionHandler rejectedExecutionHandler) {
186    
187                    _rejectedExecutionHandler = rejectedExecutionHandler;
188            }
189    
190            public void setWorkersCoreSize(int workersCoreSize) {
191                    _workersCoreSize = workersCoreSize;
192            }
193    
194            public void setWorkersMaxSize(int workersMaxSize) {
195                    _workersMaxSize = workersMaxSize;
196            }
197    
198            protected RejectedExecutionHandler createRejectionExecutionHandler() {
199                    return new RejectedExecutionHandler() {
200    
201                            @Override
202                            public void rejectedExecution(
203                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
204    
205                                    if (!_log.isWarnEnabled()) {
206                                            return;
207                                    }
208    
209                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
210    
211                                    _log.warn(
212                                            "Discarding message " + messageRunnable.getMessage() +
213                                                    " because it exceeds the maximum queue size of " +
214                                                            _maximumQueueSize);
215                            }
216    
217                    };
218            }
219    
220            protected abstract void dispatch(
221                    Set<MessageListener> messageListeners, Message message);
222    
223            protected ThreadPoolExecutor getThreadPoolExecutor() {
224                    return _threadPoolExecutor;
225            }
226    
227            protected void populateMessageFromThreadLocals(Message message) {
228                    if (!message.contains("companyId")) {
229                            message.put("companyId", CompanyThreadLocal.getCompanyId());
230                    }
231    
232                    if (!message.contains("defaultLocale")) {
233                            message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
234                    }
235    
236                    if (!message.contains("groupId")) {
237                            message.put("groupId", GroupThreadLocal.getGroupId());
238                    }
239    
240                    if (!message.contains("permissionChecker")) {
241                            message.put(
242                                    "permissionChecker",
243                                    PermissionThreadLocal.getPermissionChecker());
244                    }
245    
246                    if (!message.contains("principalName")) {
247                            message.put("principalName", PrincipalThreadLocal.getName());
248                    }
249    
250                    if (!message.contains("principalPassword")) {
251                            message.put(
252                                    "principalPassword", PrincipalThreadLocal.getPassword());
253                    }
254    
255                    if (!message.contains("siteDefaultLocale")) {
256                            message.put(
257                                    "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
258                    }
259    
260                    if (!message.contains("themeDisplayLocale")) {
261                            message.put(
262                                    "themeDisplayLocale",
263                                    LocaleThreadLocal.getThemeDisplayLocale());
264                    }
265            }
266    
267            protected void populateThreadLocalsFromMessage(Message message) {
268                    long companyId = message.getLong("companyId");
269    
270                    if (companyId > 0) {
271                            CompanyThreadLocal.setCompanyId(companyId);
272                    }
273    
274                    Locale defaultLocale = (Locale)message.get("defaultLocale");
275    
276                    if (defaultLocale != null) {
277                            LocaleThreadLocal.setDefaultLocale(defaultLocale);
278                    }
279    
280                    long groupId = message.getLong("groupId");
281    
282                    if (groupId > 0) {
283                            GroupThreadLocal.setGroupId(groupId);
284                    }
285    
286                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
287                            "permissionChecker");
288    
289                    String principalName = message.getString("principalName");
290    
291                    if (Validator.isNotNull(principalName)) {
292                            PrincipalThreadLocal.setName(principalName);
293                    }
294    
295                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
296                            try {
297                                    User user = UserLocalServiceUtil.fetchUser(
298                                            PrincipalThreadLocal.getUserId());
299    
300                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
301                            }
302                            catch (Exception e) {
303                                    throw new RuntimeException(e);
304                            }
305                    }
306    
307                    if (permissionChecker != null) {
308                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
309                    }
310    
311                    String principalPassword = message.getString("principalPassword");
312    
313                    if (Validator.isNotNull(principalPassword)) {
314                            PrincipalThreadLocal.setPassword(principalPassword);
315                    }
316    
317                    Boolean clusterForwardMessage = (Boolean)message.get(
318                            ClusterLink.CLUSTER_FORWARD_MESSAGE);
319    
320                    if (clusterForwardMessage != null) {
321                            MessageValuesThreadLocal.setValue(
322                                    ClusterLink.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
323                    }
324    
325                    Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
326    
327                    if (siteDefaultLocale != null) {
328                            LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
329                    }
330    
331                    Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
332    
333                    if (themeDisplayLocale != null) {
334                            LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
335                    }
336            }
337    
338            private static final int _WORKERS_CORE_SIZE = 2;
339    
340            private static final int _WORKERS_MAX_SIZE = 5;
341    
342            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
343    
344            private int _maximumQueueSize = Integer.MAX_VALUE;
345            private RejectedExecutionHandler _rejectedExecutionHandler;
346            private ThreadPoolExecutor _threadPoolExecutor;
347            private int _workersCoreSize = _WORKERS_CORE_SIZE;
348            private int _workersMaxSize = _WORKERS_MAX_SIZE;
349    
350    }