001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.Set;
025    
026    /**
027     * @author Michael C. Han
028     */
029    public class DefaultMessageBus implements MessageBus {
030    
031            public synchronized void addDestination(Destination destination) {
032                    _destinations.put(destination.getName(), destination);
033    
034                    fireDestinationAddedEvent(destination);
035            }
036    
037            public void addDestinationEventListener(
038                    DestinationEventListener destinationEventListener) {
039    
040                    _destinationEventListeners.add(destinationEventListener);
041            }
042    
043            public void addDestinationEventListener(
044                    String destinationName,
045                    DestinationEventListener destinationEventListener) {
046    
047                    Destination destination = _destinations.get(destinationName);
048    
049                    if (destination != null) {
050                            destination.addDestinationEventListener(destinationEventListener);
051                    }
052            }
053    
054            public void destroy() {
055                    shutdown(true);
056            }
057    
058            public Destination getDestination(String destinationName) {
059                    return _destinations.get(destinationName);
060            }
061    
062            public int getDestinationCount() {
063                    return _destinations.size();
064            }
065    
066            public Collection<String> getDestinationNames() {
067                    return _destinations.keySet();
068            }
069    
070            public Collection<Destination> getDestinations() {
071                    return _destinations.values();
072            }
073    
074            public boolean hasDestination(String destinationName) {
075                    return _destinations.containsKey(destinationName);
076            }
077    
078            public boolean hasMessageListener(String destinationName) {
079                    Destination destination = _destinations.get(destinationName);
080    
081                    if ((destination != null) && destination.isRegistered()) {
082                            return true;
083                    }
084                    else {
085                            return false;
086                    }
087            }
088    
089            public synchronized boolean registerMessageListener(
090                    String destinationName, MessageListener messageListener) {
091    
092                    Destination destination = _destinations.get(destinationName);
093    
094                    if (destination == null) {
095                            throw new IllegalStateException(
096                                    "Destination " + destinationName + " is not configured");
097                    }
098    
099                    boolean registered = destination.register(messageListener);
100    
101                    if (registered) {
102                            fireMessageListenerRegisteredEvent(destination, messageListener);
103                    }
104    
105                    return registered;
106            }
107    
108            public synchronized Destination removeDestination(String destinationName) {
109                    Destination destinationModel = _destinations.remove(destinationName);
110    
111                    if (destinationModel != null) {
112                            destinationModel.removeDestinationEventListeners();
113                            destinationModel.unregisterMessageListeners();
114    
115                            fireDestinationRemovedEvent(destinationModel);
116                    }
117    
118                    return destinationModel;
119            }
120    
121            public void removeDestinationEventListener(
122                    DestinationEventListener destinationEventListener) {
123    
124                    _destinationEventListeners.remove(destinationEventListener);
125            }
126    
127            public void removeDestinationEventListener(
128                    String destinationName,
129                    DestinationEventListener destinationEventListener) {
130    
131                    Destination destination = _destinations.get(destinationName);
132    
133                    if (destination != null) {
134                            destination.removeDestinationEventListener(
135                                    destinationEventListener);
136                    }
137            }
138    
139            public void replace(Destination destination) {
140                    Destination oldDestination = _destinations.get(destination.getName());
141    
142                    oldDestination.copyDestinationEventListeners(destination);
143                    oldDestination.copyMessageListeners(destination);
144    
145                    removeDestination(oldDestination.getName());
146    
147                    addDestination(destination);
148            }
149    
150            public void sendMessage(String destinationName, Message message) {
151                    Destination destination = _destinations.get(destinationName);
152    
153                    if (destination == null) {
154                            if (_log.isWarnEnabled()) {
155                                    _log.warn(
156                                            "Destination " + destinationName + " is not configured");
157                            }
158    
159                            return;
160                    }
161    
162                    message.setDestinationName(destinationName);
163    
164                    destination.send(message);
165            }
166    
167            public void shutdown() {
168                    shutdown(false);
169            }
170    
171            public synchronized void shutdown(boolean force) {
172                    for (Destination destination : _destinations.values()) {
173                            destination.close(force);
174                    }
175            }
176    
177            public synchronized boolean unregisterMessageListener(
178                    String destinationName, MessageListener messageListener) {
179    
180                    Destination destination = _destinations.get(destinationName);
181    
182                    if (destination == null) {
183                            return false;
184                    }
185    
186                    boolean unregistered = destination.unregister(messageListener);
187    
188                    if (unregistered) {
189                            fireMessageListenerUnregisteredEvent(destination, messageListener);
190                    }
191    
192                    return unregistered;
193            }
194    
195            protected void fireDestinationAddedEvent(Destination destination) {
196                    for (DestinationEventListener listener : _destinationEventListeners) {
197                            listener.destinationAdded(destination);
198                    }
199            }
200    
201            protected void fireDestinationRemovedEvent(Destination destination) {
202                    for (DestinationEventListener listener : _destinationEventListeners) {
203                            listener.destinationRemoved(destination);
204                    }
205            }
206    
207            protected void fireMessageListenerRegisteredEvent(
208                    Destination destination, MessageListener messageListener) {
209    
210                    for (DestinationEventListener destinationEventListener :
211                                    _destinationEventListeners) {
212    
213                            destinationEventListener.messageListenerRegistered(
214                                    destination.getName(), messageListener);
215                    }
216            }
217    
218            protected void fireMessageListenerUnregisteredEvent(
219                    Destination destination, MessageListener messageListener) {
220    
221                    for (DestinationEventListener destinationEventListener :
222                                    _destinationEventListeners) {
223    
224                            destinationEventListener.messageListenerUnregistered(
225                                    destination.getName(), messageListener);
226                    }
227            }
228    
229            private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
230    
231            private Set<DestinationEventListener> _destinationEventListeners =
232                    new ConcurrentHashSet<DestinationEventListener>();
233            private Map<String, Destination> _destinations =
234                    new HashMap<String, Destination>();
235    
236    }