001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.sender.SingleDestinationMessageSenderFactoryUtil;
020    import com.liferay.portal.kernel.messaging.sender.SynchronousMessageSender;
021    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
022    import com.liferay.portal.kernel.security.pacl.permission.PortalRuntimePermission;
023    import com.liferay.portal.kernel.util.ProxyFactory;
024    import com.liferay.registry.Registry;
025    import com.liferay.registry.RegistryUtil;
026    import com.liferay.registry.ServiceReference;
027    import com.liferay.registry.ServiceTracker;
028    import com.liferay.registry.ServiceTrackerCustomizer;
029    
030    /**
031     * @author Michael C. Han
032     * @author Raymond Aug??
033     */
034    public class MessageBusUtil {
035    
036            public static void addDestination(Destination destination) {
037                    getInstance()._addDestination(destination);
038            }
039    
040            public static Message createResponseMessage(Message requestMessage) {
041                    Message responseMessage = new Message();
042    
043                    responseMessage.setDestinationName(
044                            requestMessage.getResponseDestinationName());
045                    responseMessage.setResponseId(requestMessage.getResponseId());
046    
047                    return responseMessage;
048            }
049    
050            public static Message createResponseMessage(
051                    Message requestMessage, Object payload) {
052    
053                    Message responseMessage = createResponseMessage(requestMessage);
054    
055                    responseMessage.setPayload(payload);
056    
057                    return responseMessage;
058            }
059    
060            public static Destination getDestination(String destinationName) {
061                    return getInstance()._getDestination(destinationName);
062            }
063    
064            public static MessageBusUtil getInstance() {
065                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
066    
067                    return _instance;
068            }
069    
070            public static MessageBus getMessageBus() {
071                    return _instance._getMessageBus();
072            }
073    
074            public static boolean hasMessageListener(String destination) {
075                    return getInstance()._hasMessageListener(destination);
076            }
077    
078            public static void registerMessageListener(
079                    String destinationName, MessageListener messageListener) {
080    
081                    getInstance()._registerMessageListener(
082                            destinationName, messageListener);
083            }
084    
085            public static void removeDestination(String destinationName) {
086                    getInstance()._removeDestination(destinationName);
087            }
088    
089            public static void sendMessage(String destinationName, Message message) {
090                    getInstance()._sendMessage(destinationName, message);
091            }
092    
093            public static void sendMessage(String destinationName, Object payload) {
094                    getInstance()._sendMessage(destinationName, payload);
095            }
096    
097            public static Object sendSynchronousMessage(
098                            String destinationName, Message message)
099                    throws MessageBusException {
100    
101                    return getInstance()._sendSynchronousMessage(destinationName, message);
102            }
103    
104            public static Object sendSynchronousMessage(
105                            String destinationName, Message message, long timeout)
106                    throws MessageBusException {
107    
108                    return getInstance()._sendSynchronousMessage(
109                            destinationName, message, timeout);
110            }
111    
112            public static Object sendSynchronousMessage(
113                            String destinationName, Object payload)
114                    throws MessageBusException {
115    
116                    return getInstance()._sendSynchronousMessage(
117                            destinationName, payload, null);
118            }
119    
120            public static Object sendSynchronousMessage(
121                            String destinationName, Object payload, long timeout)
122                    throws MessageBusException {
123    
124                    return getInstance()._sendSynchronousMessage(
125                            destinationName, payload, null, timeout);
126            }
127    
128            public static Object sendSynchronousMessage(
129                            String destinationName, Object payload,
130                            String responseDestinationName)
131                    throws MessageBusException {
132    
133                    return getInstance()._sendSynchronousMessage(
134                            destinationName, payload, responseDestinationName);
135            }
136    
137            public static Object sendSynchronousMessage(
138                            String destinationName, Object payload,
139                            String responseDestinationName, long timeout)
140                    throws MessageBusException {
141    
142                    return getInstance()._sendSynchronousMessage(
143                            destinationName, payload, responseDestinationName, timeout);
144            }
145    
146            public static void shutdown() {
147                    getInstance()._shutdown();
148            }
149    
150            public static void shutdown(boolean force) {
151                    getInstance()._shutdown(force);
152            }
153    
154            public static boolean unregisterMessageListener(
155                    String destinationName, MessageListener messageListener) {
156    
157                    PortalMessageBusPermission.checkListen(destinationName);
158    
159                    return getInstance()._unregisterMessageListener(
160                            destinationName, messageListener);
161            }
162    
163            public MessageBusUtil() {
164                    Registry registry = RegistryUtil.getRegistry();
165    
166                    _serviceTracker = registry.trackServices(
167                            MessageBus.class, new MessageBusServiceTrackerCustomizer());
168    
169                    _serviceTracker.open();
170            }
171    
172            public void setSynchronousMessageSenderMode(
173                    SynchronousMessageSender.Mode synchronousMessageSenderMode) {
174    
175                    _synchronousMessageSenderMode = synchronousMessageSenderMode;
176            }
177    
178            private void _addDestination(Destination destination) {
179                    _getMessageBus().addDestination(destination);
180            }
181    
182            private Destination _getDestination(String destinationName) {
183                    return _getMessageBus().getDestination(destinationName);
184            }
185    
186            private MessageBus _getMessageBus() {
187                    try {
188                            while (!_initialized && (_serviceTracker.getService() == null)) {
189                                    if (_log.isDebugEnabled()) {
190                                            _log.debug("Waiting for a PortalExecutorManager");
191                                    }
192    
193                                    Thread.sleep(500);
194                            }
195                    }
196                    catch (InterruptedException e) {
197                            throw new IllegalStateException(
198                                    "Unable to initialize MessageBusUtil", e);
199                    }
200    
201                    return _messageBus;
202            }
203    
204            private boolean _hasMessageListener(String destinationName) {
205                    return _getMessageBus().hasMessageListener(destinationName);
206            }
207    
208            private void _registerMessageListener(
209                    String destinationName, MessageListener messageListener) {
210    
211                    PortalMessageBusPermission.checkListen(destinationName);
212    
213                    _getMessageBus().registerMessageListener(
214                            destinationName, messageListener);
215            }
216    
217            private void _removeDestination(String destinationName) {
218                    _getMessageBus().removeDestination(destinationName);
219            }
220    
221            private void _sendMessage(String destinationName, Message message) {
222                    PortalMessageBusPermission.checkSend(destinationName);
223    
224                    _getMessageBus().sendMessage(destinationName, message);
225            }
226    
227            private void _sendMessage(String destinationName, Object payload) {
228                    PortalMessageBusPermission.checkSend(destinationName);
229    
230                    Message message = new Message();
231    
232                    message.setPayload(payload);
233    
234                    _sendMessage(destinationName, message);
235            }
236    
237            private Object _sendSynchronousMessage(
238                            String destinationName, Message message)
239                    throws MessageBusException {
240    
241                    PortalMessageBusPermission.checkSend(destinationName);
242    
243                    SynchronousMessageSender synchronousMessageSender =
244                            SingleDestinationMessageSenderFactoryUtil.
245                                    getSynchronousMessageSender(_synchronousMessageSenderMode);
246    
247                    return synchronousMessageSender.send(destinationName, message);
248            }
249    
250            private Object _sendSynchronousMessage(
251                            String destinationName, Message message, long timeout)
252                    throws MessageBusException {
253    
254                    PortalMessageBusPermission.checkSend(destinationName);
255    
256                    SynchronousMessageSender synchronousMessageSender =
257                            SingleDestinationMessageSenderFactoryUtil.
258                                    getSynchronousMessageSender(_synchronousMessageSenderMode);
259    
260                    return synchronousMessageSender.send(destinationName, message, timeout);
261            }
262    
263            private Object _sendSynchronousMessage(
264                            String destinationName, Object payload,
265                            String responseDestinationName)
266                    throws MessageBusException {
267    
268                    PortalMessageBusPermission.checkSend(destinationName);
269    
270                    Message message = new Message();
271    
272                    message.setResponseDestinationName(responseDestinationName);
273                    message.setPayload(payload);
274    
275                    return _sendSynchronousMessage(destinationName, message);
276            }
277    
278            private Object _sendSynchronousMessage(
279                            String destinationName, Object payload,
280                            String responseDestinationName, long timeout)
281                    throws MessageBusException {
282    
283                    PortalMessageBusPermission.checkSend(destinationName);
284    
285                    Message message = new Message();
286    
287                    message.setResponseDestinationName(responseDestinationName);
288                    message.setPayload(payload);
289    
290                    return _sendSynchronousMessage(destinationName, message, timeout);
291            }
292    
293            private void _shutdown() {
294                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
295    
296                    _getMessageBus().shutdown();
297            }
298    
299            private void _shutdown(boolean force) {
300                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
301    
302                    _getMessageBus().shutdown(force);
303            }
304    
305            private boolean _unregisterMessageListener(
306                    String destinationName, MessageListener messageListener) {
307    
308                    return _getMessageBus().unregisterMessageListener(
309                            destinationName, messageListener);
310            }
311    
312            private static final Log _log = LogFactoryUtil.getLog(MessageBusUtil.class);
313    
314            private static final MessageBusUtil _instance = new MessageBusUtil();
315    
316            private static SynchronousMessageSender.Mode _synchronousMessageSenderMode;
317    
318            private volatile boolean _initialized;
319            private final MessageBus _messageBus =
320                    ProxyFactory.newServiceTrackedInstance(MessageBus.class);
321            private final ServiceTracker<MessageBus, MessageBus> _serviceTracker;
322    
323            private class MessageBusServiceTrackerCustomizer
324                    implements ServiceTrackerCustomizer<MessageBus, MessageBus> {
325    
326                    @Override
327                    public MessageBus addingService(
328                            ServiceReference<MessageBus> serviceReference) {
329    
330                            _initialized = true;
331    
332                            return null;
333                    }
334    
335                    @Override
336                    public void modifiedService(
337                            ServiceReference<MessageBus> serviceReference,
338                            MessageBus messageBus) {
339                    }
340    
341                    @Override
342                    public void removedService(
343                            ServiceReference<MessageBus> serviceReference, MessageBus service) {
344                    }
345    
346            }
347    
348    }