001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.Set;
025
026
029 public class DefaultMessageBus implements MessageBus {
030
031 public synchronized void addDestination(Destination destination) {
032 _destinations.put(destination.getName(), destination);
033
034 fireDestinationAddedEvent(destination);
035 }
036
037 public void addDestinationEventListener(
038 DestinationEventListener destinationEventListener) {
039
040 _destinationEventListeners.add(destinationEventListener);
041 }
042
043 public void addDestinationEventListener(
044 String destinationName,
045 DestinationEventListener destinationEventListener) {
046
047 Destination destination = _destinations.get(destinationName);
048
049 if (destination != null) {
050 destination.addDestinationEventListener(destinationEventListener);
051 }
052 }
053
054 public void destroy() {
055 shutdown(true);
056 }
057
058 public Destination getDestination(String destinationName) {
059 return _destinations.get(destinationName);
060 }
061
062 public int getDestinationCount() {
063 return _destinations.size();
064 }
065
066 public Collection<String> getDestinationNames() {
067 return _destinations.keySet();
068 }
069
070 public Collection<Destination> getDestinations() {
071 return _destinations.values();
072 }
073
074 public boolean hasDestination(String destinationName) {
075 return _destinations.containsKey(destinationName);
076 }
077
078 public boolean hasMessageListener(String destinationName) {
079 Destination destination = _destinations.get(destinationName);
080
081 if ((destination != null) && destination.isRegistered()) {
082 return true;
083 }
084 else {
085 return false;
086 }
087 }
088
089 public synchronized boolean registerMessageListener(
090 String destinationName, MessageListener messageListener) {
091
092 Destination destination = _destinations.get(destinationName);
093
094 if (destination == null) {
095 throw new IllegalStateException(
096 "Destination " + destinationName + " is not configured");
097 }
098
099 boolean registered = destination.register(messageListener);
100
101 if (registered) {
102 fireMessageListenerRegisteredEvent(destination, messageListener);
103 }
104
105 return registered;
106 }
107
108 public synchronized Destination removeDestination(String destinationName) {
109 Destination destinationModel = _destinations.remove(destinationName);
110
111 if (destinationModel != null) {
112 destinationModel.removeDestinationEventListeners();
113 destinationModel.unregisterMessageListeners();
114
115 fireDestinationRemovedEvent(destinationModel);
116 }
117
118 return destinationModel;
119 }
120
121 public void removeDestinationEventListener(
122 DestinationEventListener destinationEventListener) {
123
124 _destinationEventListeners.remove(destinationEventListener);
125 }
126
127 public void removeDestinationEventListener(
128 String destinationName,
129 DestinationEventListener destinationEventListener) {
130
131 Destination destination = _destinations.get(destinationName);
132
133 if (destination != null) {
134 destination.removeDestinationEventListener(
135 destinationEventListener);
136 }
137 }
138
139 public void replace(Destination destination) {
140 Destination oldDestination = _destinations.get(destination.getName());
141
142 oldDestination.copyDestinationEventListeners(destination);
143 oldDestination.copyMessageListeners(destination);
144
145 removeDestination(oldDestination.getName());
146
147 addDestination(destination);
148 }
149
150 public void sendMessage(String destinationName, Message message) {
151 Destination destination = _destinations.get(destinationName);
152
153 if (destination == null) {
154 if (_log.isWarnEnabled()) {
155 _log.warn(
156 "Destination " + destinationName + " is not configured");
157 }
158
159 return;
160 }
161
162 message.setDestinationName(destinationName);
163
164 destination.send(message);
165 }
166
167 public void shutdown() {
168 shutdown(false);
169 }
170
171 public synchronized void shutdown(boolean force) {
172 for (Destination destination : _destinations.values()) {
173 destination.close(force);
174 }
175 }
176
177 public synchronized boolean unregisterMessageListener(
178 String destinationName, MessageListener messageListener) {
179
180 Destination destination = _destinations.get(destinationName);
181
182 if (destination == null) {
183 return false;
184 }
185
186 boolean unregistered = destination.unregister(messageListener);
187
188 if (unregistered) {
189 fireMessageListenerUnregisteredEvent(destination, messageListener);
190 }
191
192 return unregistered;
193 }
194
195 protected void fireDestinationAddedEvent(Destination destination) {
196 for (DestinationEventListener listener : _destinationEventListeners) {
197 listener.destinationAdded(destination);
198 }
199 }
200
201 protected void fireDestinationRemovedEvent(Destination destination) {
202 for (DestinationEventListener listener : _destinationEventListeners) {
203 listener.destinationRemoved(destination);
204 }
205 }
206
207 protected void fireMessageListenerRegisteredEvent(
208 Destination destination, MessageListener messageListener) {
209
210 for (DestinationEventListener destinationEventListener :
211 _destinationEventListeners) {
212
213 destinationEventListener.messageListenerRegistered(
214 destination.getName(), messageListener);
215 }
216 }
217
218 protected void fireMessageListenerUnregisteredEvent(
219 Destination destination, MessageListener messageListener) {
220
221 for (DestinationEventListener destinationEventListener :
222 _destinationEventListeners) {
223
224 destinationEventListener.messageListenerUnregistered(
225 destination.getName(), messageListener);
226 }
227 }
228
229 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
230
231 private Set<DestinationEventListener> _destinationEventListeners =
232 new ConcurrentHashSet<DestinationEventListener>();
233 private Map<String, Destination> _destinations =
234 new HashMap<String, Destination>();
235
236 }