001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterLink;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025 import com.liferay.portal.kernel.util.GroupThreadLocal;
026 import com.liferay.portal.kernel.util.LocaleThreadLocal;
027 import com.liferay.portal.kernel.util.NamedThreadFactory;
028 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029 import com.liferay.portal.kernel.util.Validator;
030 import com.liferay.portal.model.User;
031 import com.liferay.portal.security.auth.CompanyThreadLocal;
032 import com.liferay.portal.security.auth.PrincipalThreadLocal;
033 import com.liferay.portal.security.permission.PermissionChecker;
034 import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
035 import com.liferay.portal.security.permission.PermissionThreadLocal;
036 import com.liferay.portal.service.UserLocalServiceUtil;
037
038 import java.util.Locale;
039 import java.util.Set;
040 import java.util.concurrent.TimeUnit;
041
042
046 public abstract class BaseAsyncDestination extends BaseDestination {
047
048 public BaseAsyncDestination() {
049 }
050
051
054 @Deprecated
055 public BaseAsyncDestination(String name) {
056 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
057 }
058
059
062 @Deprecated
063 public BaseAsyncDestination(
064 String name, int workersCoreSize, int workersMaxSize) {
065
066 this.name = name;
067 _workersCoreSize = workersCoreSize;
068 _workersMaxSize = workersMaxSize;
069
070 open();
071 }
072
073 @Override
074 public void close(boolean force) {
075 PortalExecutorManagerUtil.shutdown(getName(), force);
076 }
077
078 @Override
079 public DestinationStatistics getDestinationStatistics() {
080 DestinationStatistics destinationStatistics =
081 new DestinationStatistics();
082
083 destinationStatistics.setActiveThreadCount(
084 _threadPoolExecutor.getActiveCount());
085 destinationStatistics.setCurrentThreadCount(
086 _threadPoolExecutor.getPoolSize());
087 destinationStatistics.setLargestThreadCount(
088 _threadPoolExecutor.getLargestPoolSize());
089 destinationStatistics.setMaxThreadPoolSize(
090 _threadPoolExecutor.getMaxPoolSize());
091 destinationStatistics.setMinThreadPoolSize(
092 _threadPoolExecutor.getCorePoolSize());
093 destinationStatistics.setPendingMessageCount(
094 _threadPoolExecutor.getPendingTaskCount());
095 destinationStatistics.setSentMessageCount(
096 _threadPoolExecutor.getCompletedTaskCount());
097
098 return destinationStatistics;
099 }
100
101 public int getMaximumQueueSize() {
102 return _maximumQueueSize;
103 }
104
105 public int getWorkersCoreSize() {
106 return _workersCoreSize;
107 }
108
109 public int getWorkersMaxSize() {
110 return _workersMaxSize;
111 }
112
113 @Override
114 public void open() {
115 if ((_threadPoolExecutor != null) &&
116 !_threadPoolExecutor.isShutdown()) {
117
118 return;
119 }
120
121 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
122
123 if (_rejectedExecutionHandler == null) {
124 _rejectedExecutionHandler = createRejectionExecutionHandler();
125 }
126
127 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
128 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
129 _maximumQueueSize, _rejectedExecutionHandler,
130 new NamedThreadFactory(
131 getName(), Thread.NORM_PRIORITY, classLoader),
132 new ThreadPoolHandlerAdapter());
133
134 ThreadPoolExecutor oldThreadPoolExecutor =
135 PortalExecutorManagerUtil.registerPortalExecutor(
136 getName(), threadPoolExecutor);
137
138 if (oldThreadPoolExecutor != null) {
139 if (_log.isWarnEnabled()) {
140 _log.warn(
141 "Abort creating a new thread pool for destination " +
142 getName() + " and reuse previous one");
143 }
144
145 threadPoolExecutor.shutdownNow();
146
147 threadPoolExecutor = oldThreadPoolExecutor;
148 }
149
150 _threadPoolExecutor = threadPoolExecutor;
151 }
152
153 @Override
154 public void send(Message message) {
155 if (messageListeners.isEmpty()) {
156 if (_log.isDebugEnabled()) {
157 _log.debug("No message listeners for destination " + getName());
158 }
159
160 return;
161 }
162
163 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
164
165 if (threadPoolExecutor.isShutdown()) {
166 throw new IllegalStateException(
167 "Destination " + getName() + " is shutdown and cannot " +
168 "receive more messages");
169 }
170
171 populateMessageFromThreadLocals(message);
172
173 dispatch(messageListeners, message);
174 }
175
176 public void setMaximumQueueSize(int maximumQueueSize) {
177 _maximumQueueSize = maximumQueueSize;
178 }
179
180 public void setRejectedExecutionHandler(
181 RejectedExecutionHandler rejectedExecutionHandler) {
182
183 _rejectedExecutionHandler = rejectedExecutionHandler;
184 }
185
186 public void setWorkersCoreSize(int workersCoreSize) {
187 _workersCoreSize = workersCoreSize;
188 }
189
190 public void setWorkersMaxSize(int workersMaxSize) {
191 _workersMaxSize = workersMaxSize;
192 }
193
194 protected RejectedExecutionHandler createRejectionExecutionHandler() {
195 return new RejectedExecutionHandler() {
196
197 @Override
198 public void rejectedExecution(
199 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
200
201 if (!_log.isWarnEnabled()) {
202 return;
203 }
204
205 MessageRunnable messageRunnable = (MessageRunnable)runnable;
206
207 _log.warn(
208 "Discarding message " + messageRunnable.getMessage() +
209 " because it exceeds the maximum queue size of " +
210 _maximumQueueSize);
211 }
212
213 };
214 }
215
216 protected abstract void dispatch(
217 Set<MessageListener> messageListeners, Message message);
218
219 protected ThreadPoolExecutor getThreadPoolExecutor() {
220 return _threadPoolExecutor;
221 }
222
223 protected void populateMessageFromThreadLocals(Message message) {
224 if (!message.contains("companyId")) {
225 message.put("companyId", CompanyThreadLocal.getCompanyId());
226 }
227
228 if (!message.contains("defaultLocale")) {
229 message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
230 }
231
232 if (!message.contains("groupId")) {
233 message.put("groupId", GroupThreadLocal.getGroupId());
234 }
235
236 if (!message.contains("permissionChecker")) {
237 message.put(
238 "permissionChecker",
239 PermissionThreadLocal.getPermissionChecker());
240 }
241
242 if (!message.contains("principalName")) {
243 message.put("principalName", PrincipalThreadLocal.getName());
244 }
245
246 if (!message.contains("principalPassword")) {
247 message.put(
248 "principalPassword", PrincipalThreadLocal.getPassword());
249 }
250
251 if (!message.contains("siteDefaultLocale")) {
252 message.put(
253 "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
254 }
255
256 if (!message.contains("themeDisplayLocale")) {
257 message.put(
258 "themeDisplayLocale",
259 LocaleThreadLocal.getThemeDisplayLocale());
260 }
261 }
262
263 protected void populateThreadLocalsFromMessage(Message message) {
264 long companyId = message.getLong("companyId");
265
266 if (companyId > 0) {
267 CompanyThreadLocal.setCompanyId(companyId);
268 }
269
270 Locale defaultLocale = (Locale)message.get("defaultLocale");
271
272 if (defaultLocale != null) {
273 LocaleThreadLocal.setDefaultLocale(defaultLocale);
274 }
275
276 long groupId = message.getLong("groupId");
277
278 if (groupId > 0) {
279 GroupThreadLocal.setGroupId(groupId);
280 }
281
282 PermissionChecker permissionChecker = (PermissionChecker)message.get(
283 "permissionChecker");
284
285 String principalName = message.getString("principalName");
286
287 if (Validator.isNotNull(principalName)) {
288 PrincipalThreadLocal.setName(principalName);
289 }
290
291 if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
292 try {
293 User user = UserLocalServiceUtil.fetchUser(
294 PrincipalThreadLocal.getUserId());
295
296 permissionChecker = PermissionCheckerFactoryUtil.create(user);
297 }
298 catch (Exception e) {
299 throw new RuntimeException(e);
300 }
301 }
302
303 if (permissionChecker != null) {
304 PermissionThreadLocal.setPermissionChecker(permissionChecker);
305 }
306
307 String principalPassword = message.getString("principalPassword");
308
309 if (Validator.isNotNull(principalPassword)) {
310 PrincipalThreadLocal.setPassword(principalPassword);
311 }
312
313 Boolean clusterForwardMessage = (Boolean)message.get(
314 ClusterLink.CLUSTER_FORWARD_MESSAGE);
315
316 if (clusterForwardMessage != null) {
317 MessageValuesThreadLocal.setValue(
318 ClusterLink.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
319 }
320
321 Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
322
323 if (siteDefaultLocale != null) {
324 LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
325 }
326
327 Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
328
329 if (themeDisplayLocale != null) {
330 LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
331 }
332 }
333
334 private static final int _WORKERS_CORE_SIZE = 2;
335
336 private static final int _WORKERS_MAX_SIZE = 5;
337
338 private static final Log _log = LogFactoryUtil.getLog(
339 BaseAsyncDestination.class);
340
341 private int _maximumQueueSize = Integer.MAX_VALUE;
342 private RejectedExecutionHandler _rejectedExecutionHandler;
343 private ThreadPoolExecutor _threadPoolExecutor;
344 private int _workersCoreSize = _WORKERS_CORE_SIZE;
345 private int _workersMaxSize = _WORKERS_MAX_SIZE;
346
347 }