001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cache.Lifecycle;
018    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
024    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
025    import com.liferay.portal.kernel.util.Validator;
026    import com.liferay.portal.security.auth.CompanyThreadLocal;
027    import com.liferay.portal.security.auth.PrincipalThreadLocal;
028    
029    import java.util.Set;
030    
031    /**
032     * <p>
033     * Destination that delivers a message to a list of message listeners one at a
034     * time.
035     * </p>
036     *
037     * @author Michael C. Han
038     */
039    public class SerialDestination extends BaseAsyncDestination {
040    
041            public SerialDestination() {
042                    super();
043    
044                    setWorkersCoreSize(_WORKERS_CORE_SIZE);
045                    setWorkersMaxSize(_WORKERS_MAX_SIZE);
046            }
047    
048            /**
049             * @deprecated
050             */
051            public SerialDestination(String name) {
052                    super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053            }
054    
055            @Override
056            protected void dispatch(
057                    final Set<MessageListener> messageListeners, final Message message) {
058    
059                    if (!message.contains("companyId")) {
060                            message.put("companyId", CompanyThreadLocal.getCompanyId());
061                    }
062    
063                    if (!message.contains("principalName")) {
064                            message.put("principalName", PrincipalThreadLocal.getName());
065                    }
066    
067                    if (!message.contains("principalPassword")) {
068                            message.put(
069                                    "principalPassword", PrincipalThreadLocal.getPassword());
070                    }
071    
072                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
073    
074                    Runnable runnable = new MessageRunnable(message) {
075    
076                            public void run() {
077                                    try {
078                                            long messageCompanyId = message.getLong("companyId");
079    
080                                            if (messageCompanyId > 0) {
081                                                    CompanyThreadLocal.setCompanyId(messageCompanyId);
082                                            }
083    
084                                            String messagePrincipalName = message.getString(
085                                                    "principalName");
086    
087                                            if (Validator.isNotNull(messagePrincipalName)) {
088                                                    PrincipalThreadLocal.setName(messagePrincipalName);
089                                            }
090    
091                                            String messagePrincipalPassword = message.getString(
092                                                    "principalPassword");
093    
094                                            if (Validator.isNotNull(messagePrincipalPassword)) {
095                                                    PrincipalThreadLocal.setPassword(
096                                                            messagePrincipalPassword);
097                                            }
098    
099                                            Boolean clusterForwardMessage = (Boolean)message.get(
100                                                    ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
101    
102                                            if (clusterForwardMessage != null) {
103                                                    MessageValuesThreadLocal.setValue(
104                                                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE,
105                                                            clusterForwardMessage);
106                                            }
107    
108                                            for (MessageListener messageListener : messageListeners) {
109                                                    try {
110                                                            messageListener.receive(message);
111                                                    }
112                                                    catch (MessageListenerException mle) {
113                                                            _log.error(
114                                                                    "Unable to process message " + message, mle);
115                                                    }
116                                            }
117                                    }
118                                    finally {
119                                            ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
120    
121                                            CentralizedThreadLocal.clearShortLivedThreadLocals();
122                                    }
123                            }
124    
125                    };
126    
127                    threadPoolExecutor.execute(runnable);
128            }
129    
130            private static final int _WORKERS_CORE_SIZE = 1;
131    
132            private static final int _WORKERS_MAX_SIZE = 1;
133    
134            private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
135    
136    }