001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cache.Lifecycle;
018    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
024    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
025    import com.liferay.portal.kernel.util.Validator;
026    import com.liferay.portal.security.auth.CompanyThreadLocal;
027    import com.liferay.portal.security.auth.PrincipalThreadLocal;
028    
029    import java.util.Set;
030    
031    /**
032     * <p>
033     * Destination that delivers a message to a list of message listeners in
034     * parallel.
035     * </p>
036     *
037     * @author Michael C. Han
038     */
039    public class ParallelDestination extends BaseAsyncDestination {
040    
041            public ParallelDestination() {
042            }
043    
044            /**
045             * @deprecated
046             */
047            public ParallelDestination(String name) {
048                    super(name);
049            }
050    
051            /**
052             * @deprecated
053             */
054            public ParallelDestination(
055                    String name, int workersCoreSize, int workersMaxSize) {
056    
057                    super(name, workersCoreSize, workersMaxSize);
058            }
059    
060            @Override
061            protected void dispatch(
062                    Set<MessageListener> messageListeners, final Message message) {
063    
064                    if (!message.contains("companyId")) {
065                            message.put("companyId", CompanyThreadLocal.getCompanyId());
066                    }
067    
068                    if (!message.contains("principalName")) {
069                            message.put("principalName", PrincipalThreadLocal.getName());
070                    }
071    
072                    if (!message.contains("principalPassword")) {
073                            message.put(
074                                    "principalPassword", PrincipalThreadLocal.getPassword());
075                    }
076    
077                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
078    
079                    for (final MessageListener messageListener : messageListeners) {
080                            Runnable runnable = new MessageRunnable(message) {
081    
082                                    public void run() {
083                                            try {
084                                                    long messageCompanyId = message.getLong("companyId");
085    
086                                                    if (messageCompanyId > 0) {
087                                                            CompanyThreadLocal.setCompanyId(messageCompanyId);
088                                                    }
089    
090                                                    String messagePrincipalName = message.getString(
091                                                            "principalName");
092    
093                                                    if (Validator.isNotNull(messagePrincipalName)) {
094                                                            PrincipalThreadLocal.setName(messagePrincipalName);
095                                                    }
096    
097                                                    String messagePrincipalPassword = message.getString(
098                                                            "principalPassword");
099    
100                                                    if (Validator.isNotNull(messagePrincipalPassword)) {
101                                                            PrincipalThreadLocal.setPassword(
102                                                                    messagePrincipalPassword);
103                                                    }
104    
105                                                    Boolean clusterForwardMessage = (Boolean)message.get(
106                                                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
107    
108                                                    if (clusterForwardMessage != null) {
109                                                            MessageValuesThreadLocal.setValue(
110                                                                    ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE,
111                                                                    clusterForwardMessage);
112                                                    }
113    
114                                                    messageListener.receive(message);
115                                            }
116                                            catch (MessageListenerException mle) {
117                                                    _log.error("Unable to process message " + message, mle);
118                                            }
119                                            finally {
120                                                    ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
121    
122                                                    CentralizedThreadLocal.clearShortLivedThreadLocals();
123                                            }
124                                    }
125    
126                            };
127    
128                            threadPoolExecutor.execute(runnable);
129                    }
130            }
131    
132            private static Log _log = LogFactoryUtil.getLog(ParallelDestination.class);
133    
134    }