001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.messaging.sender.SingleDestinationMessageSenderFactoryUtil;
018 import com.liferay.portal.kernel.messaging.sender.SynchronousMessageSender;
019 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
020 import com.liferay.portal.kernel.security.pacl.permission.PortalRuntimePermission;
021 import com.liferay.registry.Registry;
022 import com.liferay.registry.RegistryUtil;
023 import com.liferay.registry.ServiceTracker;
024
025
029 public class MessageBusUtil {
030
031 public static void addDestination(Destination destination) {
032 getInstance()._addDestination(destination);
033 }
034
035 public static Message createResponseMessage(Message requestMessage) {
036 Message responseMessage = new Message();
037
038 responseMessage.setDestinationName(
039 requestMessage.getResponseDestinationName());
040 responseMessage.setResponseId(requestMessage.getResponseId());
041
042 return responseMessage;
043 }
044
045 public static Message createResponseMessage(
046 Message requestMessage, Object payload) {
047
048 Message responseMessage = createResponseMessage(requestMessage);
049
050 responseMessage.setPayload(payload);
051
052 return responseMessage;
053 }
054
055 public static Destination getDestination(String destinationName) {
056 return getInstance()._getDestination(destinationName);
057 }
058
059 public static MessageBusUtil getInstance() {
060 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
061
062 return _instance;
063 }
064
065 public static MessageBus getMessageBus() {
066 return _instance._getMessageBus();
067 }
068
069 public static boolean hasMessageListener(String destination) {
070 return getInstance()._hasMessageListener(destination);
071 }
072
073 public static void registerMessageListener(
074 String destinationName, MessageListener messageListener) {
075
076 getInstance()._registerMessageListener(
077 destinationName, messageListener);
078 }
079
080 public static void removeDestination(String destinationName) {
081 getInstance()._removeDestination(destinationName);
082 }
083
084 public static void sendMessage(String destinationName, Message message) {
085 getInstance()._sendMessage(destinationName, message);
086 }
087
088 public static void sendMessage(String destinationName, Object payload) {
089 getInstance()._sendMessage(destinationName, payload);
090 }
091
092 public static Object sendSynchronousMessage(
093 String destinationName, Message message)
094 throws MessageBusException {
095
096 return getInstance()._sendSynchronousMessage(destinationName, message);
097 }
098
099 public static Object sendSynchronousMessage(
100 String destinationName, Message message, long timeout)
101 throws MessageBusException {
102
103 return getInstance()._sendSynchronousMessage(
104 destinationName, message, timeout);
105 }
106
107 public static Object sendSynchronousMessage(
108 String destinationName, Object payload)
109 throws MessageBusException {
110
111 return getInstance()._sendSynchronousMessage(
112 destinationName, payload, null);
113 }
114
115 public static Object sendSynchronousMessage(
116 String destinationName, Object payload, long timeout)
117 throws MessageBusException {
118
119 return getInstance()._sendSynchronousMessage(
120 destinationName, payload, null, timeout);
121 }
122
123 public static Object sendSynchronousMessage(
124 String destinationName, Object payload,
125 String responseDestinationName)
126 throws MessageBusException {
127
128 return getInstance()._sendSynchronousMessage(
129 destinationName, payload, responseDestinationName);
130 }
131
132 public static Object sendSynchronousMessage(
133 String destinationName, Object payload,
134 String responseDestinationName, long timeout)
135 throws MessageBusException {
136
137 return getInstance()._sendSynchronousMessage(
138 destinationName, payload, responseDestinationName, timeout);
139 }
140
141 public static void shutdown() {
142 getInstance()._shutdown();
143 }
144
145 public static void shutdown(boolean force) {
146 getInstance()._shutdown(force);
147 }
148
149 public static boolean unregisterMessageListener(
150 String destinationName, MessageListener messageListener) {
151
152 PortalMessageBusPermission.checkListen(destinationName);
153
154 return getInstance()._unregisterMessageListener(
155 destinationName, messageListener);
156 }
157
158 public MessageBusUtil() {
159 Registry registry = RegistryUtil.getRegistry();
160
161 _serviceTracker = registry.trackServices(MessageBus.class);
162
163 _serviceTracker.open();
164 }
165
166 public void setSynchronousMessageSenderMode(
167 SynchronousMessageSender.Mode synchronousMessageSenderMode) {
168
169 _synchronousMessageSenderMode = synchronousMessageSenderMode;
170 }
171
172 private void _addDestination(Destination destination) {
173 _getMessageBus().addDestination(destination);
174 }
175
176 private Destination _getDestination(String destinationName) {
177 return _getMessageBus().getDestination(destinationName);
178 }
179
180 private MessageBus _getMessageBus() {
181 try {
182 while (_serviceTracker.getService() == null) {
183 Thread.sleep(500);
184 }
185
186 return _serviceTracker.getService();
187 }
188 catch (InterruptedException e) {
189 throw new IllegalStateException(
190 "Unable to initialize MessageBusUtil", e);
191 }
192 }
193
194 private boolean _hasMessageListener(String destinationName) {
195 return _getMessageBus().hasMessageListener(destinationName);
196 }
197
198 private void _registerMessageListener(
199 String destinationName, MessageListener messageListener) {
200
201 PortalMessageBusPermission.checkListen(destinationName);
202
203 _getMessageBus().registerMessageListener(
204 destinationName, messageListener);
205 }
206
207 private void _removeDestination(String destinationName) {
208 _getMessageBus().removeDestination(destinationName);
209 }
210
211 private void _sendMessage(String destinationName, Message message) {
212 PortalMessageBusPermission.checkSend(destinationName);
213
214 _getMessageBus().sendMessage(destinationName, message);
215 }
216
217 private void _sendMessage(String destinationName, Object payload) {
218 PortalMessageBusPermission.checkSend(destinationName);
219
220 Message message = new Message();
221
222 message.setPayload(payload);
223
224 _sendMessage(destinationName, message);
225 }
226
227 private Object _sendSynchronousMessage(
228 String destinationName, Message message)
229 throws MessageBusException {
230
231 PortalMessageBusPermission.checkSend(destinationName);
232
233 SynchronousMessageSender synchronousMessageSender =
234 SingleDestinationMessageSenderFactoryUtil.
235 getSynchronousMessageSender(_synchronousMessageSenderMode);
236
237 return synchronousMessageSender.send(destinationName, message);
238 }
239
240 private Object _sendSynchronousMessage(
241 String destinationName, Message message, long timeout)
242 throws MessageBusException {
243
244 PortalMessageBusPermission.checkSend(destinationName);
245
246 SynchronousMessageSender synchronousMessageSender =
247 SingleDestinationMessageSenderFactoryUtil.
248 getSynchronousMessageSender(_synchronousMessageSenderMode);
249
250 return synchronousMessageSender.send(destinationName, message, timeout);
251 }
252
253 private Object _sendSynchronousMessage(
254 String destinationName, Object payload,
255 String responseDestinationName)
256 throws MessageBusException {
257
258 PortalMessageBusPermission.checkSend(destinationName);
259
260 Message message = new Message();
261
262 message.setResponseDestinationName(responseDestinationName);
263 message.setPayload(payload);
264
265 return _sendSynchronousMessage(destinationName, message);
266 }
267
268 private Object _sendSynchronousMessage(
269 String destinationName, Object payload,
270 String responseDestinationName, long timeout)
271 throws MessageBusException {
272
273 PortalMessageBusPermission.checkSend(destinationName);
274
275 Message message = new Message();
276
277 message.setResponseDestinationName(responseDestinationName);
278 message.setPayload(payload);
279
280 return _sendSynchronousMessage(destinationName, message, timeout);
281 }
282
283 private void _shutdown() {
284 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
285
286 _getMessageBus().shutdown();
287 }
288
289 private void _shutdown(boolean force) {
290 PortalRuntimePermission.checkGetBeanProperty(MessageBusUtil.class);
291
292 _getMessageBus().shutdown(force);
293 }
294
295 private boolean _unregisterMessageListener(
296 String destinationName, MessageListener messageListener) {
297
298 return _getMessageBus().unregisterMessageListener(
299 destinationName, messageListener);
300 }
301
302 private static final MessageBusUtil _instance = new MessageBusUtil();
303
304 private static SynchronousMessageSender.Mode _synchronousMessageSenderMode;
305
306 private final ServiceTracker<MessageBus, MessageBus> _serviceTracker;
307
308 }