001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.sender.SingleDestinationMessageSenderFactoryUtil;
020 import com.liferay.portal.kernel.messaging.sender.SynchronousMessageSender;
021 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
022 import com.liferay.portal.kernel.security.pacl.permission.PortalRuntimePermission;
023 import com.liferay.portal.kernel.util.ProxyFactory;
024 import com.liferay.registry.Registry;
025 import com.liferay.registry.RegistryUtil;
026 import com.liferay.registry.ServiceReference;
027 import com.liferay.registry.ServiceTracker;
028 import com.liferay.registry.ServiceTrackerCustomizer;
029
030
034 public class MessageBusUtil {
035
036 public static void addDestination(Destination destination) {
037 getInstance()._addDestination(destination);
038 }
039
040 public static Message createResponseMessage(Message requestMessage) {
041 Message responseMessage = new Message();
042
043 responseMessage.setDestinationName(
044 requestMessage.getResponseDestinationName());
045 responseMessage.setResponseId(requestMessage.getResponseId());
046
047 return responseMessage;
048 }
049
050 public static Message createResponseMessage(
051 Message requestMessage, Object payload) {
052
053 Message responseMessage = createResponseMessage(requestMessage);
054
055 responseMessage.setPayload(payload);
056
057 return responseMessage;
058 }
059
060 public static Destination getDestination(String destinationName) {
061 return getInstance()._getDestination(destinationName);
062 }
063
064 public static MessageBusUtil getInstance() {
065 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
066
067 return _instance;
068 }
069
070 public static MessageBus getMessageBus() {
071 return _instance._getMessageBus();
072 }
073
074 public static boolean hasMessageListener(String destination) {
075 return getInstance()._hasMessageListener(destination);
076 }
077
078 public static void registerMessageListener(
079 String destinationName, MessageListener messageListener) {
080
081 getInstance()._registerMessageListener(
082 destinationName, messageListener);
083 }
084
085 public static void removeDestination(String destinationName) {
086 getInstance()._removeDestination(destinationName);
087 }
088
089 public static void sendMessage(String destinationName, Message message) {
090 getInstance()._sendMessage(destinationName, message);
091 }
092
093 public static void sendMessage(String destinationName, Object payload) {
094 getInstance()._sendMessage(destinationName, payload);
095 }
096
097 public static Object sendSynchronousMessage(
098 String destinationName, Message message)
099 throws MessageBusException {
100
101 return getInstance()._sendSynchronousMessage(destinationName, message);
102 }
103
104 public static Object sendSynchronousMessage(
105 String destinationName, Message message, long timeout)
106 throws MessageBusException {
107
108 return getInstance()._sendSynchronousMessage(
109 destinationName, message, timeout);
110 }
111
112 public static Object sendSynchronousMessage(
113 String destinationName, Object payload)
114 throws MessageBusException {
115
116 return getInstance()._sendSynchronousMessage(
117 destinationName, payload, null);
118 }
119
120 public static Object sendSynchronousMessage(
121 String destinationName, Object payload, long timeout)
122 throws MessageBusException {
123
124 return getInstance()._sendSynchronousMessage(
125 destinationName, payload, null, timeout);
126 }
127
128 public static Object sendSynchronousMessage(
129 String destinationName, Object payload,
130 String responseDestinationName)
131 throws MessageBusException {
132
133 return getInstance()._sendSynchronousMessage(
134 destinationName, payload, responseDestinationName);
135 }
136
137 public static Object sendSynchronousMessage(
138 String destinationName, Object payload,
139 String responseDestinationName, long timeout)
140 throws MessageBusException {
141
142 return getInstance()._sendSynchronousMessage(
143 destinationName, payload, responseDestinationName, timeout);
144 }
145
146 public static void shutdown() {
147 getInstance()._shutdown();
148 }
149
150 public static void shutdown(boolean force) {
151 getInstance()._shutdown(force);
152 }
153
154 public static boolean unregisterMessageListener(
155 String destinationName, MessageListener messageListener) {
156
157 PortalMessageBusPermission.checkListen(destinationName);
158
159 return getInstance()._unregisterMessageListener(
160 destinationName, messageListener);
161 }
162
163 public MessageBusUtil() {
164 Registry registry = RegistryUtil.getRegistry();
165
166 _serviceTracker = registry.trackServices(
167 MessageBus.class, new MessageBusServiceTrackerCustomizer());
168
169 _serviceTracker.open();
170 }
171
172 public void setSynchronousMessageSenderMode(
173 SynchronousMessageSender.Mode synchronousMessageSenderMode) {
174
175 _synchronousMessageSenderMode = synchronousMessageSenderMode;
176 }
177
178 private void _addDestination(Destination destination) {
179 _getMessageBus().addDestination(destination);
180 }
181
182 private Destination _getDestination(String destinationName) {
183 return _getMessageBus().getDestination(destinationName);
184 }
185
186 private MessageBus _getMessageBus() {
187 try {
188 while (!_initialized && (_serviceTracker.getService() == null)) {
189 if (_log.isDebugEnabled()) {
190 _log.debug("Waiting for a PortalExecutorManager");
191 }
192
193 Thread.sleep(500);
194 }
195 }
196 catch (InterruptedException e) {
197 throw new IllegalStateException(
198 "Unable to initialize MessageBusUtil", e);
199 }
200
201 return _messageBus;
202 }
203
204 private boolean _hasMessageListener(String destinationName) {
205 return _getMessageBus().hasMessageListener(destinationName);
206 }
207
208 private void _registerMessageListener(
209 String destinationName, MessageListener messageListener) {
210
211 PortalMessageBusPermission.checkListen(destinationName);
212
213 _getMessageBus().registerMessageListener(
214 destinationName, messageListener);
215 }
216
217 private void _removeDestination(String destinationName) {
218 _getMessageBus().removeDestination(destinationName);
219 }
220
221 private void _sendMessage(String destinationName, Message message) {
222 PortalMessageBusPermission.checkSend(destinationName);
223
224 _getMessageBus().sendMessage(destinationName, message);
225 }
226
227 private void _sendMessage(String destinationName, Object payload) {
228 PortalMessageBusPermission.checkSend(destinationName);
229
230 Message message = new Message();
231
232 message.setPayload(payload);
233
234 _sendMessage(destinationName, message);
235 }
236
237 private Object _sendSynchronousMessage(
238 String destinationName, Message message)
239 throws MessageBusException {
240
241 PortalMessageBusPermission.checkSend(destinationName);
242
243 SynchronousMessageSender synchronousMessageSender =
244 SingleDestinationMessageSenderFactoryUtil.
245 getSynchronousMessageSender(_synchronousMessageSenderMode);
246
247 return synchronousMessageSender.send(destinationName, message);
248 }
249
250 private Object _sendSynchronousMessage(
251 String destinationName, Message message, long timeout)
252 throws MessageBusException {
253
254 PortalMessageBusPermission.checkSend(destinationName);
255
256 SynchronousMessageSender synchronousMessageSender =
257 SingleDestinationMessageSenderFactoryUtil.
258 getSynchronousMessageSender(_synchronousMessageSenderMode);
259
260 return synchronousMessageSender.send(destinationName, message, timeout);
261 }
262
263 private Object _sendSynchronousMessage(
264 String destinationName, Object payload,
265 String responseDestinationName)
266 throws MessageBusException {
267
268 PortalMessageBusPermission.checkSend(destinationName);
269
270 Message message = new Message();
271
272 message.setResponseDestinationName(responseDestinationName);
273 message.setPayload(payload);
274
275 return _sendSynchronousMessage(destinationName, message);
276 }
277
278 private Object _sendSynchronousMessage(
279 String destinationName, Object payload,
280 String responseDestinationName, long timeout)
281 throws MessageBusException {
282
283 PortalMessageBusPermission.checkSend(destinationName);
284
285 Message message = new Message();
286
287 message.setResponseDestinationName(responseDestinationName);
288 message.setPayload(payload);
289
290 return _sendSynchronousMessage(destinationName, message, timeout);
291 }
292
293 private void _shutdown() {
294 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
295
296 _getMessageBus().shutdown();
297 }
298
299 private void _shutdown(boolean force) {
300 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
301
302 _getMessageBus().shutdown(force);
303 }
304
305 private boolean _unregisterMessageListener(
306 String destinationName, MessageListener messageListener) {
307
308 return _getMessageBus().unregisterMessageListener(
309 destinationName, messageListener);
310 }
311
312 private static final Log _log = LogFactoryUtil.getLog(MessageBusUtil.class);
313
314 private static final MessageBusUtil _instance = new MessageBusUtil();
315
316 private static SynchronousMessageSender.Mode _synchronousMessageSenderMode;
317
318 private volatile boolean _initialized;
319 private final MessageBus _messageBus =
320 ProxyFactory.newServiceTrackedInstance(MessageBus.class);
321 private final ServiceTracker<MessageBus, MessageBus> _serviceTracker;
322
323 private class MessageBusServiceTrackerCustomizer
324 implements ServiceTrackerCustomizer<MessageBus, MessageBus> {
325
326 @Override
327 public MessageBus addingService(
328 ServiceReference<MessageBus> serviceReference) {
329
330 _initialized = true;
331
332 return null;
333 }
334
335 @Override
336 public void modifiedService(
337 ServiceReference<MessageBus> serviceReference,
338 MessageBus messageBus) {
339 }
340
341 @Override
342 public void removedService(
343 ServiceReference<MessageBus> serviceReference, MessageBus service) {
344 }
345
346 }
347
348 }