001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
024 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
025 import com.liferay.portal.kernel.util.Validator;
026 import com.liferay.portal.security.auth.CompanyThreadLocal;
027 import com.liferay.portal.security.auth.PrincipalThreadLocal;
028
029 import java.util.Set;
030
031
039 public class ParallelDestination extends BaseAsyncDestination {
040
041 public ParallelDestination() {
042 }
043
044
047 public ParallelDestination(String name) {
048 super(name);
049 }
050
051
054 public ParallelDestination(
055 String name, int workersCoreSize, int workersMaxSize) {
056
057 super(name, workersCoreSize, workersMaxSize);
058 }
059
060 @Override
061 protected void dispatch(
062 Set<MessageListener> messageListeners, final Message message) {
063
064 if (!message.contains("companyId")) {
065 message.put("companyId", CompanyThreadLocal.getCompanyId());
066 }
067
068 if (!message.contains("principalName")) {
069 message.put("principalName", PrincipalThreadLocal.getName());
070 }
071
072 if (!message.contains("principalPassword")) {
073 message.put(
074 "principalPassword", PrincipalThreadLocal.getPassword());
075 }
076
077 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
078
079 for (final MessageListener messageListener : messageListeners) {
080 Runnable runnable = new MessageRunnable(message) {
081
082 public void run() {
083 try {
084 long messageCompanyId = message.getLong("companyId");
085
086 if (messageCompanyId > 0) {
087 CompanyThreadLocal.setCompanyId(messageCompanyId);
088 }
089
090 String messagePrincipalName = message.getString(
091 "principalName");
092
093 if (Validator.isNotNull(messagePrincipalName)) {
094 PrincipalThreadLocal.setName(messagePrincipalName);
095 }
096
097 String messagePrincipalPassword = message.getString(
098 "principalPassword");
099
100 if (Validator.isNotNull(messagePrincipalPassword)) {
101 PrincipalThreadLocal.setPassword(
102 messagePrincipalPassword);
103 }
104
105 Boolean clusterForwardMessage = (Boolean)message.get(
106 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
107
108 if (clusterForwardMessage != null) {
109 MessageValuesThreadLocal.setValue(
110 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE,
111 clusterForwardMessage);
112 }
113
114 messageListener.receive(message);
115 }
116 catch (MessageListenerException mle) {
117 _log.error("Unable to process message " + message, mle);
118 }
119 finally {
120 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
121
122 CentralizedThreadLocal.clearShortLivedThreadLocals();
123 }
124 }
125
126 };
127
128 threadPoolExecutor.execute(runnable);
129 }
130 }
131
132 private static Log _log = LogFactoryUtil.getLog(ParallelDestination.class);
133
134 }