001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021 import com.liferay.portal.kernel.executor.PortalExecutorManager;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.util.GroupThreadLocal;
025 import com.liferay.portal.kernel.util.LocaleThreadLocal;
026 import com.liferay.portal.kernel.util.NamedThreadFactory;
027 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
028 import com.liferay.portal.kernel.util.Validator;
029 import com.liferay.portal.model.User;
030 import com.liferay.portal.security.auth.CompanyThreadLocal;
031 import com.liferay.portal.security.auth.PrincipalThreadLocal;
032 import com.liferay.portal.security.permission.PermissionChecker;
033 import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
034 import com.liferay.portal.security.permission.PermissionThreadLocal;
035 import com.liferay.portal.service.UserLocalServiceUtil;
036 import com.liferay.registry.Registry;
037 import com.liferay.registry.RegistryUtil;
038 import com.liferay.registry.ServiceReference;
039 import com.liferay.registry.ServiceTracker;
040 import com.liferay.registry.ServiceTrackerCustomizer;
041
042 import java.util.Locale;
043 import java.util.Set;
044 import java.util.concurrent.TimeUnit;
045
046
050 public abstract class BaseAsyncDestination extends BaseDestination {
051
052 @Override
053 public void afterPropertiesSet() {
054 super.afterPropertiesSet();
055
056 Registry registry = RegistryUtil.getRegistry();
057
058 serviceTracker = registry.trackServices(
059 PortalExecutorManager.class,
060 new PortalExecutorManagerServiceTrackerCustomizer());
061
062 serviceTracker.open();
063 }
064
065 @Override
066 public void close(boolean force) {
067 if (portalExecutorManager == null) {
068 return;
069 }
070
071 ThreadPoolExecutor threadPoolExecutor =
072 portalExecutorManager.getPortalExecutor(getName());
073
074 if (force) {
075 threadPoolExecutor.shutdownNow();
076 }
077 else {
078 threadPoolExecutor.shutdown();
079 }
080 }
081
082 @Override
083 public void destroy() {
084 super.destroy();
085
086 serviceTracker.close();
087 }
088
089 @Override
090 public DestinationStatistics getDestinationStatistics() {
091 DestinationStatistics destinationStatistics =
092 new DestinationStatistics();
093
094 destinationStatistics.setActiveThreadCount(
095 _threadPoolExecutor.getActiveCount());
096 destinationStatistics.setCurrentThreadCount(
097 _threadPoolExecutor.getPoolSize());
098 destinationStatistics.setLargestThreadCount(
099 _threadPoolExecutor.getLargestPoolSize());
100 destinationStatistics.setMaxThreadPoolSize(
101 _threadPoolExecutor.getMaxPoolSize());
102 destinationStatistics.setMinThreadPoolSize(
103 _threadPoolExecutor.getCorePoolSize());
104 destinationStatistics.setPendingMessageCount(
105 _threadPoolExecutor.getPendingTaskCount());
106 destinationStatistics.setSentMessageCount(
107 _threadPoolExecutor.getCompletedTaskCount());
108
109 return destinationStatistics;
110 }
111
112 public int getMaximumQueueSize() {
113 return _maximumQueueSize;
114 }
115
116 public int getWorkersCoreSize() {
117 return _workersCoreSize;
118 }
119
120 public int getWorkersMaxSize() {
121 return _workersMaxSize;
122 }
123
124 @Override
125 public void open() {
126 if ((_threadPoolExecutor != null) &&
127 !_threadPoolExecutor.isShutdown()) {
128
129 return;
130 }
131
132 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
133
134 if (_rejectedExecutionHandler == null) {
135 _rejectedExecutionHandler = createRejectionExecutionHandler();
136 }
137
138 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
139 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
140 _maximumQueueSize, _rejectedExecutionHandler,
141 new NamedThreadFactory(
142 getName(), Thread.NORM_PRIORITY, classLoader),
143 new ThreadPoolHandlerAdapter());
144
145 ThreadPoolExecutor oldThreadPoolExecutor =
146 portalExecutorManager.registerPortalExecutor(
147 getName(), threadPoolExecutor);
148
149 if (oldThreadPoolExecutor != null) {
150 if (_log.isWarnEnabled()) {
151 _log.warn(
152 "Abort creating a new thread pool for destination " +
153 getName() + " and reuse previous one");
154 }
155
156 threadPoolExecutor.shutdownNow();
157
158 threadPoolExecutor = oldThreadPoolExecutor;
159 }
160
161 _threadPoolExecutor = threadPoolExecutor;
162 }
163
164 @Override
165 public void send(Message message) {
166 if (messageListeners.isEmpty()) {
167 if (_log.isDebugEnabled()) {
168 _log.debug("No message listeners for destination " + getName());
169 }
170
171 return;
172 }
173
174 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
175
176 if (threadPoolExecutor.isShutdown()) {
177 throw new IllegalStateException(
178 "Destination " + getName() + " is shutdown and cannot " +
179 "receive more messages");
180 }
181
182 populateMessageFromThreadLocals(message);
183
184 if (_log.isDebugEnabled()) {
185 _log.debug(
186 "Sending message " + message + " from destination " +
187 getName() + " to message listeners " + messageListeners);
188 }
189
190 dispatch(messageListeners, message);
191 }
192
193 public void setMaximumQueueSize(int maximumQueueSize) {
194 _maximumQueueSize = maximumQueueSize;
195 }
196
197 public void setRejectedExecutionHandler(
198 RejectedExecutionHandler rejectedExecutionHandler) {
199
200 _rejectedExecutionHandler = rejectedExecutionHandler;
201 }
202
203 public void setWorkersCoreSize(int workersCoreSize) {
204 _workersCoreSize = workersCoreSize;
205 }
206
207 public void setWorkersMaxSize(int workersMaxSize) {
208 _workersMaxSize = workersMaxSize;
209 }
210
211 protected RejectedExecutionHandler createRejectionExecutionHandler() {
212 return new RejectedExecutionHandler() {
213
214 @Override
215 public void rejectedExecution(
216 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
217
218 if (!_log.isWarnEnabled()) {
219 return;
220 }
221
222 MessageRunnable messageRunnable = (MessageRunnable)runnable;
223
224 _log.warn(
225 "Discarding message " + messageRunnable.getMessage() +
226 " because it exceeds the maximum queue size of " +
227 _maximumQueueSize);
228 }
229
230 };
231 }
232
233 protected abstract void dispatch(
234 Set<MessageListener> messageListeners, Message message);
235
236 protected ThreadPoolExecutor getThreadPoolExecutor() {
237 return _threadPoolExecutor;
238 }
239
240 protected void populateMessageFromThreadLocals(Message message) {
241 if (!message.contains("companyId")) {
242 message.put("companyId", CompanyThreadLocal.getCompanyId());
243 }
244
245 if (!ClusterInvokeThreadLocal.isEnabled()) {
246 message.put("clusterInvoke", Boolean.FALSE);
247 }
248
249 if (!message.contains("defaultLocale")) {
250 message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
251 }
252
253 if (!message.contains("groupId")) {
254 message.put("groupId", GroupThreadLocal.getGroupId());
255 }
256
257 if (!message.contains("permissionChecker")) {
258 message.put(
259 "permissionChecker",
260 PermissionThreadLocal.getPermissionChecker());
261 }
262
263 if (!message.contains("principalName")) {
264 message.put("principalName", PrincipalThreadLocal.getName());
265 }
266
267 if (!message.contains("principalPassword")) {
268 message.put(
269 "principalPassword", PrincipalThreadLocal.getPassword());
270 }
271
272 if (!message.contains("siteDefaultLocale")) {
273 message.put(
274 "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
275 }
276
277 if (!message.contains("themeDisplayLocale")) {
278 message.put(
279 "themeDisplayLocale",
280 LocaleThreadLocal.getThemeDisplayLocale());
281 }
282 }
283
284 protected void populateThreadLocalsFromMessage(Message message) {
285 long companyId = message.getLong("companyId");
286
287 if (companyId > 0) {
288 CompanyThreadLocal.setCompanyId(companyId);
289 }
290
291 Boolean clusterInvoke = (Boolean)message.get("clusterInvoke");
292
293 if (clusterInvoke != null) {
294 ClusterInvokeThreadLocal.setEnabled(clusterInvoke);
295 }
296
297 Locale defaultLocale = (Locale)message.get("defaultLocale");
298
299 if (defaultLocale != null) {
300 LocaleThreadLocal.setDefaultLocale(defaultLocale);
301 }
302
303 long groupId = message.getLong("groupId");
304
305 if (groupId > 0) {
306 GroupThreadLocal.setGroupId(groupId);
307 }
308
309 PermissionChecker permissionChecker = (PermissionChecker)message.get(
310 "permissionChecker");
311
312 String principalName = message.getString("principalName");
313
314 if (Validator.isNotNull(principalName)) {
315 PrincipalThreadLocal.setName(principalName);
316 }
317
318 if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
319 try {
320 User user = UserLocalServiceUtil.fetchUser(
321 PrincipalThreadLocal.getUserId());
322
323 permissionChecker = PermissionCheckerFactoryUtil.create(user);
324 }
325 catch (Exception e) {
326 throw new RuntimeException(e);
327 }
328 }
329
330 if (permissionChecker != null) {
331 PermissionThreadLocal.setPermissionChecker(permissionChecker);
332 }
333
334 String principalPassword = message.getString("principalPassword");
335
336 if (Validator.isNotNull(principalPassword)) {
337 PrincipalThreadLocal.setPassword(principalPassword);
338 }
339
340 Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
341
342 if (siteDefaultLocale != null) {
343 LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
344 }
345
346 Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
347
348 if (themeDisplayLocale != null) {
349 LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
350 }
351 }
352
353 protected volatile PortalExecutorManager portalExecutorManager;
354 protected ServiceTracker<PortalExecutorManager, PortalExecutorManager>
355 serviceTracker;
356
357 private static final int _WORKERS_CORE_SIZE = 2;
358
359 private static final int _WORKERS_MAX_SIZE = 5;
360
361 private static final Log _log = LogFactoryUtil.getLog(
362 BaseAsyncDestination.class);
363
364 private int _maximumQueueSize = Integer.MAX_VALUE;
365 private RejectedExecutionHandler _rejectedExecutionHandler;
366 private ThreadPoolExecutor _threadPoolExecutor;
367 private int _workersCoreSize = _WORKERS_CORE_SIZE;
368 private int _workersMaxSize = _WORKERS_MAX_SIZE;
369
370 private class PortalExecutorManagerServiceTrackerCustomizer
371 implements ServiceTrackerCustomizer
372 <PortalExecutorManager, PortalExecutorManager> {
373
374 @Override
375 public PortalExecutorManager addingService(
376 ServiceReference<PortalExecutorManager> serviceReference) {
377
378 Registry registry = RegistryUtil.getRegistry();
379
380 portalExecutorManager = registry.getService(serviceReference);
381
382 open();
383
384 return portalExecutorManager;
385 }
386
387 @Override
388 public void modifiedService(
389 ServiceReference<PortalExecutorManager> serviceReference,
390 PortalExecutorManager portalExecutorManager) {
391 }
392
393 @Override
394 public void removedService(
395 ServiceReference<PortalExecutorManager> serviceReference,
396 PortalExecutorManager portalExecutorManager) {
397
398 portalExecutorManager = null;
399 }
400
401 }
402
403 }