001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025 import com.liferay.portal.kernel.util.NamedThreadFactory;
026 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027 import com.liferay.portal.kernel.util.Validator;
028 import com.liferay.portal.model.User;
029 import com.liferay.portal.security.auth.CompanyThreadLocal;
030 import com.liferay.portal.security.auth.PrincipalThreadLocal;
031 import com.liferay.portal.security.permission.PermissionChecker;
032 import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
033 import com.liferay.portal.security.permission.PermissionThreadLocal;
034 import com.liferay.portal.service.UserLocalServiceUtil;
035
036 import java.util.Set;
037 import java.util.concurrent.TimeUnit;
038
039
043 public abstract class BaseAsyncDestination extends BaseDestination {
044
045 public BaseAsyncDestination() {
046 }
047
048
051 public BaseAsyncDestination(String name) {
052 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053 }
054
055
058 public BaseAsyncDestination(
059 String name, int workersCoreSize, int workersMaxSize) {
060
061 this.name = name;
062 _workersCoreSize = workersCoreSize;
063 _workersMaxSize = workersMaxSize;
064
065 open();
066 }
067
068 @Override
069 public void close(boolean force) {
070 PortalExecutorManagerUtil.shutdown(getName(), force);
071 }
072
073 public DestinationStatistics getDestinationStatistics() {
074 DestinationStatistics destinationStatistics =
075 new DestinationStatistics();
076
077 destinationStatistics.setActiveThreadCount(
078 _threadPoolExecutor.getActiveCount());
079 destinationStatistics.setCurrentThreadCount(
080 _threadPoolExecutor.getPoolSize());
081 destinationStatistics.setLargestThreadCount(
082 _threadPoolExecutor.getLargestPoolSize());
083 destinationStatistics.setMaxThreadPoolSize(
084 _threadPoolExecutor.getMaxPoolSize());
085 destinationStatistics.setMinThreadPoolSize(
086 _threadPoolExecutor.getCorePoolSize());
087 destinationStatistics.setPendingMessageCount(
088 _threadPoolExecutor.getPendingTaskCount());
089 destinationStatistics.setSentMessageCount(
090 _threadPoolExecutor.getCompletedTaskCount());
091
092 return destinationStatistics;
093 }
094
095 public int getMaximumQueueSize() {
096 return _maximumQueueSize;
097 }
098
099 public int getWorkersCoreSize() {
100 return _workersCoreSize;
101 }
102
103 public int getWorkersMaxSize() {
104 return _workersMaxSize;
105 }
106
107 @Override
108 public void open() {
109 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
110 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
111
112 if (_rejectedExecutionHandler == null) {
113 _rejectedExecutionHandler = createRejectionExecutionHandler();
114 }
115
116 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
117 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
118 _maximumQueueSize, _rejectedExecutionHandler,
119 new NamedThreadFactory(
120 getName(), Thread.NORM_PRIORITY, classLoader),
121 new ThreadPoolHandlerAdapter());
122
123 ThreadPoolExecutor oldThreadPoolExecutor =
124 PortalExecutorManagerUtil.registerPortalExecutor(
125 getName(), threadPoolExecutor);
126
127 if (oldThreadPoolExecutor != null) {
128 if (_log.isWarnEnabled()) {
129 _log.warn(
130 "Abort creating a new thread pool for destination " +
131 getName() + " and reuse previous one");
132 }
133
134 threadPoolExecutor.shutdownNow();
135
136 threadPoolExecutor = oldThreadPoolExecutor;
137 }
138
139 _threadPoolExecutor = threadPoolExecutor;
140 }
141 }
142
143 public void send(Message message) {
144 if (messageListeners.isEmpty()) {
145 if (_log.isDebugEnabled()) {
146 _log.debug("No message listeners for destination " + getName());
147 }
148
149 return;
150 }
151
152 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
153
154 if (threadPoolExecutor.isShutdown()) {
155 throw new IllegalStateException(
156 "Destination " + getName() + " is shutdown and cannot " +
157 "receive more messages");
158 }
159
160 populateMessageFromThreadLocals(message);
161
162 dispatch(messageListeners, message);
163 }
164
165 public void setMaximumQueueSize(int maximumQueueSize) {
166 _maximumQueueSize = maximumQueueSize;
167 }
168
169 public void setRejectedExecutionHandler(
170 RejectedExecutionHandler rejectedExecutionHandler) {
171
172 _rejectedExecutionHandler = rejectedExecutionHandler;
173 }
174
175 public void setWorkersCoreSize(int workersCoreSize) {
176 _workersCoreSize = workersCoreSize;
177 }
178
179 public void setWorkersMaxSize(int workersMaxSize) {
180 _workersMaxSize = workersMaxSize;
181 }
182
183 protected RejectedExecutionHandler createRejectionExecutionHandler() {
184 return new RejectedExecutionHandler() {
185
186 public void rejectedExecution(
187 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
188
189 if (!_log.isWarnEnabled()) {
190 return;
191 }
192
193 MessageRunnable messageRunnable = (MessageRunnable) runnable;
194
195 _log.warn(
196 "Discarding message " + messageRunnable.getMessage() +
197 " because it exceeds the maximum queue size of " +
198 _maximumQueueSize);
199 }
200
201 };
202 }
203
204 protected abstract void dispatch(
205 Set<MessageListener> messageListeners, Message message);
206
207 protected ThreadPoolExecutor getThreadPoolExecutor() {
208 return _threadPoolExecutor;
209 }
210
211 protected void populateMessageFromThreadLocals(Message message) {
212 if (!message.contains("companyId")) {
213 message.put("companyId", CompanyThreadLocal.getCompanyId());
214 }
215
216 if (!message.contains("permissionChecker")) {
217 message.put(
218 "permissionChecker",
219 PermissionThreadLocal.getPermissionChecker());
220 }
221
222 if (!message.contains("principalName")) {
223 message.put("principalName", PrincipalThreadLocal.getName());
224 }
225
226 if (!message.contains("principalPassword")) {
227 message.put(
228 "principalPassword", PrincipalThreadLocal.getPassword());
229 }
230 }
231
232 protected void populateThreadLocalsFromMessage(Message message) {
233 long companyId = message.getLong("companyId");
234
235 if (companyId > 0) {
236 CompanyThreadLocal.setCompanyId(companyId);
237 }
238
239 PermissionChecker permissionChecker = (PermissionChecker)message.get(
240 "permissionChecker");
241
242 String principalName = message.getString("principalName");
243
244 if (Validator.isNotNull(principalName)) {
245 PrincipalThreadLocal.setName(principalName);
246 }
247
248 if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
249 try {
250 User user = UserLocalServiceUtil.fetchUser(
251 PrincipalThreadLocal.getUserId());
252
253 permissionChecker = PermissionCheckerFactoryUtil.create(user);
254 }
255 catch (Exception e) {
256 throw new RuntimeException(e);
257 }
258 }
259
260 if (permissionChecker != null) {
261 PermissionThreadLocal.setPermissionChecker(permissionChecker);
262 }
263
264 String principalPassword = message.getString("principalPassword");
265
266 if (Validator.isNotNull(principalPassword)) {
267 PrincipalThreadLocal.setPassword(principalPassword);
268 }
269
270 Boolean clusterForwardMessage = (Boolean)message.get(
271 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
272
273 if (clusterForwardMessage != null) {
274 MessageValuesThreadLocal.setValue(
275 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
276 }
277 }
278
279 private static final int _WORKERS_CORE_SIZE = 2;
280
281 private static final int _WORKERS_MAX_SIZE = 5;
282
283 private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
284
285 private int _maximumQueueSize = Integer.MAX_VALUE;
286 private RejectedExecutionHandler _rejectedExecutionHandler;
287 private ThreadPoolExecutor _threadPoolExecutor;
288 private int _workersCoreSize = _WORKERS_CORE_SIZE;
289 private int _workersMaxSize = _WORKERS_MAX_SIZE;
290
291 }