001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
021    import com.liferay.portal.kernel.util.Validator;
022    import com.liferay.portal.security.auth.CompanyThreadLocal;
023    import com.liferay.portal.security.auth.PrincipalThreadLocal;
024    
025    import java.util.Set;
026    
027    /**
028     * <p>
029     * Destination that delivers a message to a list of message listeners one at a
030     * time.
031     * </p>
032     *
033     * @author Michael C. Han
034     */
035    public class SerialDestination extends BaseAsyncDestination {
036    
037            public SerialDestination() {
038                    super();
039    
040                    setWorkersCoreSize(_WORKERS_CORE_SIZE);
041                    setWorkersMaxSize(_WORKERS_MAX_SIZE);
042            }
043    
044            /**
045             * @deprecated
046             */
047            public SerialDestination(String name) {
048                    super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
049            }
050    
051            @Override
052            protected void dispatch(
053                    final Set<MessageListener> messageListeners, final Message message) {
054    
055                    if (!message.contains("companyId")) {
056                            message.put("companyId", CompanyThreadLocal.getCompanyId());
057                    }
058    
059                    if (!message.contains("principalName")) {
060                            message.put("principalName", PrincipalThreadLocal.getName());
061                    }
062    
063                    if (!message.contains("principalPassword")) {
064                            message.put(
065                                    "principalPassword", PrincipalThreadLocal.getPassword());
066                    }
067    
068                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
069    
070                    Runnable runnable = new MessageRunnable(message) {
071    
072                            public void run() {
073                                    long companyId = CompanyThreadLocal.getCompanyId();
074                                    String principalName = PrincipalThreadLocal.getName();
075                                    String principalPassword = PrincipalThreadLocal.getPassword();
076    
077                                    try {
078                                            long messageCompanyId = message.getLong("companyId");
079    
080                                            if (messageCompanyId > 0) {
081                                                    CompanyThreadLocal.setCompanyId(messageCompanyId);
082                                            }
083    
084                                            String messagePrincipalName = message.getString(
085                                                    "principalName");
086    
087                                            if (Validator.isNotNull(messagePrincipalName)) {
088                                                    PrincipalThreadLocal.setName(messagePrincipalName);
089                                            }
090    
091                                            String messagePrincipalPassword = message.getString(
092                                                    "principalPassword");
093    
094                                            if (Validator.isNotNull(messagePrincipalPassword)) {
095                                                    PrincipalThreadLocal.setPassword(
096                                                            messagePrincipalPassword);
097                                            }
098    
099                                            for (MessageListener messageListener : messageListeners) {
100                                                    try {
101                                                            messageListener.receive(message);
102                                                    }
103                                                    catch (MessageListenerException mle) {
104                                                            _log.error(
105                                                                    "Unable to process message " + message, mle);
106                                                    }
107                                            }
108                                    }
109                                    finally {
110                                            CompanyThreadLocal.setCompanyId(companyId);
111                                            PrincipalThreadLocal.setName(principalName);
112                                            PrincipalThreadLocal.setPassword(principalPassword);
113    
114                                            CentralizedThreadLocal.clearShortLivedThreadLocals();
115                                    }
116                            }
117    
118                    };
119    
120                    threadPoolExecutor.execute(runnable);
121            }
122    
123            private static final int _WORKERS_CORE_SIZE = 1;
124    
125            private static final int _WORKERS_MAX_SIZE = 1;
126    
127            private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
128    
129    }