001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
021    import com.liferay.portal.kernel.util.Validator;
022    import com.liferay.portal.security.auth.CompanyThreadLocal;
023    import com.liferay.portal.security.auth.PrincipalThreadLocal;
024    
025    import java.util.Set;
026    
027    /**
028     * <p>
029     * Destination that delivers a message to a list of message listeners in
030     * parallel.
031     * </p>
032     *
033     * @author Michael C. Han
034     */
035    public class ParallelDestination extends BaseAsyncDestination {
036    
037            public ParallelDestination() {
038            }
039    
040            /**
041             * @deprecated
042             */
043            public ParallelDestination(String name) {
044                    super(name);
045            }
046    
047            /**
048             * @deprecated
049             */
050            public ParallelDestination(
051                    String name, int workersCoreSize, int workersMaxSize) {
052    
053                    super(name, workersCoreSize, workersMaxSize);
054            }
055    
056            @Override
057            protected void dispatch(
058                    Set<MessageListener> messageListeners, final Message message) {
059    
060                    if (!message.contains("companyId")) {
061                            message.put("companyId", CompanyThreadLocal.getCompanyId());
062                    }
063    
064                    if (!message.contains("principalName")) {
065                            message.put("principalName", PrincipalThreadLocal.getName());
066                    }
067    
068                    if (!message.contains("principalPassword")) {
069                            message.put(
070                                    "principalPassword", PrincipalThreadLocal.getPassword());
071                    }
072    
073                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
074    
075                    for (final MessageListener messageListener : messageListeners) {
076                            Runnable runnable = new MessageRunnable(message) {
077    
078                                    public void run() {
079                                            long companyId = CompanyThreadLocal.getCompanyId();
080                                            String principalName = PrincipalThreadLocal.getName();
081                                            String principalPassword =
082                                                    PrincipalThreadLocal.getPassword();
083    
084                                            try {
085                                                    long messageCompanyId = message.getLong("companyId");
086    
087                                                    if (messageCompanyId > 0) {
088                                                            CompanyThreadLocal.setCompanyId(messageCompanyId);
089                                                    }
090    
091                                                    String messagePrincipalName = message.getString(
092                                                            "principalName");
093    
094                                                    if (Validator.isNotNull(messagePrincipalName)) {
095                                                            PrincipalThreadLocal.setName(messagePrincipalName);
096                                                    }
097    
098                                                    String messagePrincipalPassword = message.getString(
099                                                            "principalPassword");
100    
101                                                    if (Validator.isNotNull(messagePrincipalPassword)) {
102                                                            PrincipalThreadLocal.setPassword(
103                                                                    messagePrincipalPassword);
104                                                    }
105    
106                                                    messageListener.receive(message);
107                                            }
108                                            catch (MessageListenerException mle) {
109                                                    _log.error("Unable to process message " + message, mle);
110                                            }
111                                            finally {
112                                                    CompanyThreadLocal.setCompanyId(companyId);
113                                                    PrincipalThreadLocal.setName(principalName);
114                                                    PrincipalThreadLocal.setPassword(principalPassword);
115    
116                                                    CentralizedThreadLocal.clearShortLivedThreadLocals();
117                                            }
118                                    }
119    
120                            };
121    
122                            threadPoolExecutor.execute(runnable);
123                    }
124            }
125    
126            private static Log _log = LogFactoryUtil.getLog(ParallelDestination.class);
127    
128    }