001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
024 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
025 import com.liferay.portal.kernel.util.Validator;
026 import com.liferay.portal.security.auth.CompanyThreadLocal;
027 import com.liferay.portal.security.auth.PrincipalThreadLocal;
028
029 import java.util.Set;
030
031
039 public class SerialDestination extends BaseAsyncDestination {
040
041 public SerialDestination() {
042 super();
043
044 setWorkersCoreSize(_WORKERS_CORE_SIZE);
045 setWorkersMaxSize(_WORKERS_MAX_SIZE);
046 }
047
048
051 public SerialDestination(String name) {
052 super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053 }
054
055 @Override
056 protected void dispatch(
057 final Set<MessageListener> messageListeners, final Message message) {
058
059 if (!message.contains("companyId")) {
060 message.put("companyId", CompanyThreadLocal.getCompanyId());
061 }
062
063 if (!message.contains("principalName")) {
064 message.put("principalName", PrincipalThreadLocal.getName());
065 }
066
067 if (!message.contains("principalPassword")) {
068 message.put(
069 "principalPassword", PrincipalThreadLocal.getPassword());
070 }
071
072 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
073
074 Runnable runnable = new MessageRunnable(message) {
075
076 public void run() {
077 try {
078 long messageCompanyId = message.getLong("companyId");
079
080 if (messageCompanyId > 0) {
081 CompanyThreadLocal.setCompanyId(messageCompanyId);
082 }
083
084 String messagePrincipalName = message.getString(
085 "principalName");
086
087 if (Validator.isNotNull(messagePrincipalName)) {
088 PrincipalThreadLocal.setName(messagePrincipalName);
089 }
090
091 String messagePrincipalPassword = message.getString(
092 "principalPassword");
093
094 if (Validator.isNotNull(messagePrincipalPassword)) {
095 PrincipalThreadLocal.setPassword(
096 messagePrincipalPassword);
097 }
098
099 Boolean clusterForwardMessage = (Boolean)message.get(
100 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
101
102 if (clusterForwardMessage != null) {
103 MessageValuesThreadLocal.setValue(
104 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE,
105 clusterForwardMessage);
106 }
107
108 for (MessageListener messageListener : messageListeners) {
109 try {
110 messageListener.receive(message);
111 }
112 catch (MessageListenerException mle) {
113 _log.error(
114 "Unable to process message " + message, mle);
115 }
116 }
117 }
118 finally {
119 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
120
121 CentralizedThreadLocal.clearShortLivedThreadLocals();
122 }
123 }
124
125 };
126
127 threadPoolExecutor.execute(runnable);
128 }
129
130 private static final int _WORKERS_CORE_SIZE = 1;
131
132 private static final int _WORKERS_MAX_SIZE = 1;
133
134 private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
135
136 }