001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
021 import com.liferay.portal.security.auth.CompanyThreadLocal;
022
023 import java.util.Set;
024
025
033 public class ParallelDestination extends BaseAsyncDestination {
034
035 public ParallelDestination() {
036 }
037
038
041 public ParallelDestination(String name) {
042 super(name);
043 }
044
045
048 public ParallelDestination(
049 String name, int workersCoreSize, int workersMaxSize) {
050
051 super(name, workersCoreSize, workersMaxSize);
052 }
053
054 @Override
055 protected void dispatch(
056 Set<MessageListener> messageListeners, final Message message) {
057
058 if (!message.contains("companyId")) {
059 message.put("companyId", CompanyThreadLocal.getCompanyId());
060 }
061
062 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
063
064 for (final MessageListener messageListener : messageListeners) {
065 Runnable runnable = new MessageRunnable(message) {
066
067 public void run() {
068 long companyId = CompanyThreadLocal.getCompanyId();
069
070 try {
071 long messageCompanyId = message.getLong("companyId");
072
073 if (messageCompanyId > 0) {
074 CompanyThreadLocal.setCompanyId(messageCompanyId);
075 }
076
077 messageListener.receive(message);
078 }
079 catch (MessageListenerException mle) {
080 _log.error("Unable to process message " + message, mle);
081 }
082 finally {
083 CompanyThreadLocal.setCompanyId(companyId);
084
085 CentralizedThreadLocal.clearShortLivedThreadLocals();
086 }
087 }
088
089 };
090
091 threadPoolExecutor.execute(runnable);
092 }
093 }
094
095 private static Log _log = LogFactoryUtil.getLog(ParallelDestination.class);
096
097 }