001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
021 import com.liferay.portal.kernel.util.Validator;
022 import com.liferay.portal.security.auth.CompanyThreadLocal;
023 import com.liferay.portal.security.auth.PrincipalThreadLocal;
024
025 import java.util.Set;
026
027
035 public class SerialDestination extends BaseAsyncDestination {
036
037 public SerialDestination() {
038 super();
039
040 setWorkersCoreSize(_WORKERS_CORE_SIZE);
041 setWorkersMaxSize(_WORKERS_MAX_SIZE);
042 }
043
044
047 public SerialDestination(String name) {
048 super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
049 }
050
051 @Override
052 protected void dispatch(
053 final Set<MessageListener> messageListeners, final Message message) {
054
055 if (!message.contains("companyId")) {
056 message.put("companyId", CompanyThreadLocal.getCompanyId());
057 }
058
059 if (!message.contains("principalName")) {
060 message.put("principalName", PrincipalThreadLocal.getName());
061 }
062
063 if (!message.contains("principalPassword")) {
064 message.put(
065 "principalPassword", PrincipalThreadLocal.getPassword());
066 }
067
068 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
069
070 Runnable runnable = new MessageRunnable(message) {
071
072 public void run() {
073 long companyId = CompanyThreadLocal.getCompanyId();
074 String principalName = PrincipalThreadLocal.getName();
075 String principalPassword = PrincipalThreadLocal.getPassword();
076
077 try {
078 long messageCompanyId = message.getLong("companyId");
079
080 if (messageCompanyId > 0) {
081 CompanyThreadLocal.setCompanyId(messageCompanyId);
082 }
083
084 String messagePrincipalName = message.getString(
085 "principalName");
086
087 if (Validator.isNotNull(messagePrincipalName)) {
088 PrincipalThreadLocal.setName(messagePrincipalName);
089 }
090
091 String messagePrincipalPassword = message.getString(
092 "principalPassword");
093
094 if (Validator.isNotNull(messagePrincipalPassword)) {
095 PrincipalThreadLocal.setPassword(
096 messagePrincipalPassword);
097 }
098
099 for (MessageListener messageListener : messageListeners) {
100 try {
101 messageListener.receive(message);
102 }
103 catch (MessageListenerException mle) {
104 _log.error(
105 "Unable to process message " + message, mle);
106 }
107 }
108 }
109 finally {
110 CompanyThreadLocal.setCompanyId(companyId);
111 PrincipalThreadLocal.setName(principalName);
112 PrincipalThreadLocal.setPassword(principalPassword);
113
114 CentralizedThreadLocal.clearShortLivedThreadLocals();
115 }
116 }
117
118 };
119
120 threadPoolExecutor.execute(runnable);
121 }
122
123 private static final int _WORKERS_CORE_SIZE = 1;
124
125 private static final int _WORKERS_MAX_SIZE = 1;
126
127 private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
128
129 }