001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
021 import com.liferay.portal.security.auth.CompanyThreadLocal;
022
023 import java.util.Set;
024
025
033 public class SerialDestination extends BaseAsyncDestination {
034
035 public SerialDestination() {
036 super();
037
038 setWorkersCoreSize(_WORKERS_CORE_SIZE);
039 setWorkersMaxSize(_WORKERS_MAX_SIZE);
040 }
041
042
045 public SerialDestination(String name) {
046 super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
047 }
048
049 @Override
050 protected void dispatch(
051 final Set<MessageListener> messageListeners, final Message message) {
052
053 if (!message.contains("companyId")) {
054 message.put("companyId", CompanyThreadLocal.getCompanyId());
055 }
056
057 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
058
059 Runnable runnable = new MessageRunnable(message) {
060
061 public void run() {
062 long companyId = CompanyThreadLocal.getCompanyId();
063
064 try {
065 long messageCompanyId = message.getLong("companyId");
066
067 if (messageCompanyId > 0) {
068 CompanyThreadLocal.setCompanyId(messageCompanyId);
069 }
070
071 for (MessageListener messageListener : messageListeners) {
072 try {
073 messageListener.receive(message);
074 }
075 catch (MessageListenerException mle) {
076 _log.error(
077 "Unable to process message " + message, mle);
078 }
079 }
080 }
081 finally {
082 CompanyThreadLocal.setCompanyId(companyId);
083
084 CentralizedThreadLocal.clearShortLivedThreadLocals();
085 }
086 }
087
088 };
089
090 threadPoolExecutor.execute(runnable);
091 }
092
093 private static final int _WORKERS_CORE_SIZE = 1;
094
095 private static final int _WORKERS_MAX_SIZE = 1;
096
097 private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
098
099 }