001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.Set;
025
026
029 public class DefaultMessageBus implements MessageBus {
030
031 @Override
032 public synchronized void addDestination(Destination destination) {
033 _destinations.put(destination.getName(), destination);
034
035 fireDestinationAddedEvent(destination);
036 }
037
038 @Override
039 public void addDestinationEventListener(
040 DestinationEventListener destinationEventListener) {
041
042 _destinationEventListeners.add(destinationEventListener);
043 }
044
045 @Override
046 public void addDestinationEventListener(
047 String destinationName,
048 DestinationEventListener destinationEventListener) {
049
050 Destination destination = _destinations.get(destinationName);
051
052 if (destination != null) {
053 destination.addDestinationEventListener(destinationEventListener);
054 }
055 }
056
057 @Override
058 public Destination getDestination(String destinationName) {
059 return _destinations.get(destinationName);
060 }
061
062 @Override
063 public int getDestinationCount() {
064 return _destinations.size();
065 }
066
067 @Override
068 public Collection<String> getDestinationNames() {
069 return _destinations.keySet();
070 }
071
072 @Override
073 public Collection<Destination> getDestinations() {
074 return _destinations.values();
075 }
076
077 @Override
078 public boolean hasDestination(String destinationName) {
079 return _destinations.containsKey(destinationName);
080 }
081
082 @Override
083 public boolean hasMessageListener(String destinationName) {
084 Destination destination = _destinations.get(destinationName);
085
086 if ((destination != null) && destination.isRegistered()) {
087 return true;
088 }
089 else {
090 return false;
091 }
092 }
093
094 @Override
095 public synchronized boolean registerMessageListener(
096 String destinationName, MessageListener messageListener) {
097
098 Destination destination = _destinations.get(destinationName);
099
100 if (destination == null) {
101 throw new IllegalStateException(
102 "Destination " + destinationName + " is not configured");
103 }
104
105 boolean registered = destination.register(messageListener);
106
107 if (registered) {
108 fireMessageListenerRegisteredEvent(destination, messageListener);
109 }
110
111 return registered;
112 }
113
114 @Override
115 public synchronized Destination removeDestination(String destinationName) {
116 Destination destinationModel = _destinations.remove(destinationName);
117
118 if (destinationModel != null) {
119 destinationModel.removeDestinationEventListeners();
120 destinationModel.unregisterMessageListeners();
121
122 fireDestinationRemovedEvent(destinationModel);
123 }
124
125 return destinationModel;
126 }
127
128 @Override
129 public void removeDestinationEventListener(
130 DestinationEventListener destinationEventListener) {
131
132 _destinationEventListeners.remove(destinationEventListener);
133 }
134
135 @Override
136 public void removeDestinationEventListener(
137 String destinationName,
138 DestinationEventListener destinationEventListener) {
139
140 Destination destination = _destinations.get(destinationName);
141
142 if (destination != null) {
143 destination.removeDestinationEventListener(
144 destinationEventListener);
145 }
146 }
147
148 @Override
149 public void replace(Destination destination) {
150 Destination oldDestination = _destinations.get(destination.getName());
151
152 oldDestination.copyDestinationEventListeners(destination);
153 oldDestination.copyMessageListeners(destination);
154
155 removeDestination(oldDestination.getName());
156
157 addDestination(destination);
158 }
159
160 @Override
161 public void sendMessage(String destinationName, Message message) {
162 Destination destination = _destinations.get(destinationName);
163
164 if (destination == null) {
165 if (_log.isWarnEnabled()) {
166 _log.warn(
167 "Destination " + destinationName + " is not configured");
168 }
169
170 return;
171 }
172
173 message.setDestinationName(destinationName);
174
175 destination.send(message);
176 }
177
178 @Override
179 public void shutdown() {
180 shutdown(false);
181 }
182
183 @Override
184 public synchronized void shutdown(boolean force) {
185 for (Destination destination : _destinations.values()) {
186 destination.close(force);
187 }
188 }
189
190 @Override
191 public synchronized boolean unregisterMessageListener(
192 String destinationName, MessageListener messageListener) {
193
194 Destination destination = _destinations.get(destinationName);
195
196 if (destination == null) {
197 return false;
198 }
199
200 boolean unregistered = destination.unregister(messageListener);
201
202 if (unregistered) {
203 fireMessageListenerUnregisteredEvent(destination, messageListener);
204 }
205
206 return unregistered;
207 }
208
209 protected void fireDestinationAddedEvent(Destination destination) {
210 for (DestinationEventListener listener : _destinationEventListeners) {
211 listener.destinationAdded(destination);
212 }
213 }
214
215 protected void fireDestinationRemovedEvent(Destination destination) {
216 for (DestinationEventListener listener : _destinationEventListeners) {
217 listener.destinationRemoved(destination);
218 }
219 }
220
221 protected void fireMessageListenerRegisteredEvent(
222 Destination destination, MessageListener messageListener) {
223
224 for (DestinationEventListener destinationEventListener :
225 _destinationEventListeners) {
226
227 destinationEventListener.messageListenerRegistered(
228 destination.getName(), messageListener);
229 }
230 }
231
232 protected void fireMessageListenerUnregisteredEvent(
233 Destination destination, MessageListener messageListener) {
234
235 for (DestinationEventListener destinationEventListener :
236 _destinationEventListeners) {
237
238 destinationEventListener.messageListenerUnregistered(
239 destination.getName(), messageListener);
240 }
241 }
242
243 private static final Log _log = LogFactoryUtil.getLog(
244 DefaultMessageBus.class);
245
246 private final Set<DestinationEventListener> _destinationEventListeners =
247 new ConcurrentHashSet<DestinationEventListener>();
248 private final Map<String, Destination> _destinations =
249 new HashMap<String, Destination>();
250
251 }