001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.messaging.sender.SingleDestinationMessageSenderFactoryUtil;
018    import com.liferay.portal.kernel.messaging.sender.SynchronousMessageSender;
019    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
020    import com.liferay.portal.kernel.security.pacl.permission.PortalRuntimePermission;
021    import com.liferay.registry.Registry;
022    import com.liferay.registry.RegistryUtil;
023    import com.liferay.registry.ServiceTracker;
024    
025    /**
026     * @author Michael C. Han
027     * @author Raymond Aug??
028     */
029    public class MessageBusUtil {
030    
031            public static void addDestination(Destination destination) {
032                    getInstance()._addDestination(destination);
033            }
034    
035            public static Message createResponseMessage(Message requestMessage) {
036                    Message responseMessage = new Message();
037    
038                    responseMessage.setDestinationName(
039                            requestMessage.getResponseDestinationName());
040                    responseMessage.setResponseId(requestMessage.getResponseId());
041    
042                    return responseMessage;
043            }
044    
045            public static Message createResponseMessage(
046                    Message requestMessage, Object payload) {
047    
048                    Message responseMessage = createResponseMessage(requestMessage);
049    
050                    responseMessage.setPayload(payload);
051    
052                    return responseMessage;
053            }
054    
055            public static Destination getDestination(String destinationName) {
056                    return getInstance()._getDestination(destinationName);
057            }
058    
059            public static MessageBusUtil getInstance() {
060                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
061    
062                    return _instance;
063            }
064    
065            public static MessageBus getMessageBus() {
066                    return _instance._getMessageBus();
067            }
068    
069            public static boolean hasMessageListener(String destination) {
070                    return getInstance()._hasMessageListener(destination);
071            }
072    
073            public static void registerMessageListener(
074                    String destinationName, MessageListener messageListener) {
075    
076                    getInstance()._registerMessageListener(
077                            destinationName, messageListener);
078            }
079    
080            public static void removeDestination(String destinationName) {
081                    getInstance()._removeDestination(destinationName);
082            }
083    
084            public static void sendMessage(String destinationName, Message message) {
085                    getInstance()._sendMessage(destinationName, message);
086            }
087    
088            public static void sendMessage(String destinationName, Object payload) {
089                    getInstance()._sendMessage(destinationName, payload);
090            }
091    
092            public static Object sendSynchronousMessage(
093                            String destinationName, Message message)
094                    throws MessageBusException {
095    
096                    return getInstance()._sendSynchronousMessage(destinationName, message);
097            }
098    
099            public static Object sendSynchronousMessage(
100                            String destinationName, Message message, long timeout)
101                    throws MessageBusException {
102    
103                    return getInstance()._sendSynchronousMessage(
104                            destinationName, message, timeout);
105            }
106    
107            public static Object sendSynchronousMessage(
108                            String destinationName, Object payload)
109                    throws MessageBusException {
110    
111                    return getInstance()._sendSynchronousMessage(
112                            destinationName, payload, null);
113            }
114    
115            public static Object sendSynchronousMessage(
116                            String destinationName, Object payload, long timeout)
117                    throws MessageBusException {
118    
119                    return getInstance()._sendSynchronousMessage(
120                            destinationName, payload, null, timeout);
121            }
122    
123            public static Object sendSynchronousMessage(
124                            String destinationName, Object payload,
125                            String responseDestinationName)
126                    throws MessageBusException {
127    
128                    return getInstance()._sendSynchronousMessage(
129                            destinationName, payload, responseDestinationName);
130            }
131    
132            public static Object sendSynchronousMessage(
133                            String destinationName, Object payload,
134                            String responseDestinationName, long timeout)
135                    throws MessageBusException {
136    
137                    return getInstance()._sendSynchronousMessage(
138                            destinationName, payload, responseDestinationName, timeout);
139            }
140    
141            public static void shutdown() {
142                    getInstance()._shutdown();
143            }
144    
145            public static void shutdown(boolean force) {
146                    getInstance()._shutdown(force);
147            }
148    
149            public static boolean unregisterMessageListener(
150                    String destinationName, MessageListener messageListener) {
151    
152                    PortalMessageBusPermission.checkListen(destinationName);
153    
154                    return getInstance()._unregisterMessageListener(
155                            destinationName, messageListener);
156            }
157    
158            public MessageBusUtil() {
159                    Registry registry = RegistryUtil.getRegistry();
160    
161                    _serviceTracker = registry.trackServices(MessageBus.class);
162    
163                    _serviceTracker.open();
164            }
165    
166            public void setSynchronousMessageSenderMode(
167                    SynchronousMessageSender.Mode synchronousMessageSenderMode) {
168    
169                    _synchronousMessageSenderMode = synchronousMessageSenderMode;
170            }
171    
172            private void _addDestination(Destination destination) {
173                    _getMessageBus().addDestination(destination);
174            }
175    
176            private Destination _getDestination(String destinationName) {
177                    return _getMessageBus().getDestination(destinationName);
178            }
179    
180            private MessageBus _getMessageBus() {
181                    try {
182                            while (_serviceTracker.getService() == null) {
183                                    Thread.sleep(500);
184                            }
185    
186                            return _serviceTracker.getService();
187                    }
188                    catch (InterruptedException e) {
189                            throw new IllegalStateException(
190                                    "Unable to initialize MessageBusUtil", e);
191                    }
192            }
193    
194            private boolean _hasMessageListener(String destinationName) {
195                    return _getMessageBus().hasMessageListener(destinationName);
196            }
197    
198            private void _registerMessageListener(
199                    String destinationName, MessageListener messageListener) {
200    
201                    PortalMessageBusPermission.checkListen(destinationName);
202    
203                    _getMessageBus().registerMessageListener(
204                            destinationName, messageListener);
205            }
206    
207            private void _removeDestination(String destinationName) {
208                    _getMessageBus().removeDestination(destinationName);
209            }
210    
211            private void _sendMessage(String destinationName, Message message) {
212                    PortalMessageBusPermission.checkSend(destinationName);
213    
214                    _getMessageBus().sendMessage(destinationName, message);
215            }
216    
217            private void _sendMessage(String destinationName, Object payload) {
218                    PortalMessageBusPermission.checkSend(destinationName);
219    
220                    Message message = new Message();
221    
222                    message.setPayload(payload);
223    
224                    _sendMessage(destinationName, message);
225            }
226    
227            private Object _sendSynchronousMessage(
228                            String destinationName, Message message)
229                    throws MessageBusException {
230    
231                    PortalMessageBusPermission.checkSend(destinationName);
232    
233                    SynchronousMessageSender synchronousMessageSender =
234                            SingleDestinationMessageSenderFactoryUtil.
235                                    getSynchronousMessageSender(_synchronousMessageSenderMode);
236    
237                    return synchronousMessageSender.send(destinationName, message);
238            }
239    
240            private Object _sendSynchronousMessage(
241                            String destinationName, Message message, long timeout)
242                    throws MessageBusException {
243    
244                    PortalMessageBusPermission.checkSend(destinationName);
245    
246                    SynchronousMessageSender synchronousMessageSender =
247                            SingleDestinationMessageSenderFactoryUtil.
248                                    getSynchronousMessageSender(_synchronousMessageSenderMode);
249    
250                    return synchronousMessageSender.send(destinationName, message, timeout);
251            }
252    
253            private Object _sendSynchronousMessage(
254                            String destinationName, Object payload,
255                            String responseDestinationName)
256                    throws MessageBusException {
257    
258                    PortalMessageBusPermission.checkSend(destinationName);
259    
260                    Message message = new Message();
261    
262                    message.setResponseDestinationName(responseDestinationName);
263                    message.setPayload(payload);
264    
265                    return _sendSynchronousMessage(destinationName, message);
266            }
267    
268            private Object _sendSynchronousMessage(
269                            String destinationName, Object payload,
270                            String responseDestinationName, long timeout)
271                    throws MessageBusException {
272    
273                    PortalMessageBusPermission.checkSend(destinationName);
274    
275                    Message message = new Message();
276    
277                    message.setResponseDestinationName(responseDestinationName);
278                    message.setPayload(payload);
279    
280                    return _sendSynchronousMessage(destinationName, message, timeout);
281            }
282    
283            private void _shutdown() {
284                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
285    
286                    _getMessageBus().shutdown();
287            }
288    
289            private void _shutdown(boolean force) {
290                    PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
291    
292                    _getMessageBus().shutdown(force);
293            }
294    
295            private boolean _unregisterMessageListener(
296                    String destinationName, MessageListener messageListener) {
297    
298                    return _getMessageBus().unregisterMessageListener(
299                            destinationName, messageListener);
300            }
301    
302            private static final MessageBusUtil _instance = new MessageBusUtil();
303    
304            private static SynchronousMessageSender.Mode _synchronousMessageSenderMode;
305    
306            private final ServiceTracker<MessageBus, MessageBus> _serviceTracker;
307    
308    }