001
014
015 package com.liferay.portal.kernel.messaging.config;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021 import com.liferay.portal.kernel.messaging.DestinationEventListener;
022 import com.liferay.portal.kernel.messaging.DestinationFactory;
023 import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
024 import com.liferay.portal.kernel.messaging.MessageBus;
025 import com.liferay.portal.kernel.messaging.MessageBusEventListener;
026 import com.liferay.portal.kernel.messaging.MessageListener;
027 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
028 import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
029 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
030 import com.liferay.portal.kernel.resiliency.spi.SPI;
031 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
032 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
033 import com.liferay.portal.kernel.util.ClassLoaderPool;
034 import com.liferay.portal.kernel.util.StringBundler;
035 import com.liferay.portal.kernel.util.StringPool;
036 import com.liferay.registry.Filter;
037 import com.liferay.registry.Registry;
038 import com.liferay.registry.RegistryUtil;
039 import com.liferay.registry.ServiceRegistrar;
040 import com.liferay.registry.dependency.ServiceDependencyListener;
041 import com.liferay.registry.dependency.ServiceDependencyManager;
042
043 import java.lang.reflect.Method;
044
045 import java.util.ArrayList;
046 import java.util.HashMap;
047 import java.util.HashSet;
048 import java.util.List;
049 import java.util.Map;
050 import java.util.Set;
051
052
055 public abstract class AbstractMessagingConfigurator
056 implements MessagingConfigurator {
057
058 public void afterPropertiesSet() {
059 final ServiceDependencyManager serviceDependencyManager =
060 new ServiceDependencyManager();
061
062 serviceDependencyManager.addServiceDependencyListener(
063 new ServiceDependencyListener() {
064
065 @Override
066 public void dependenciesFulfilled() {
067 Registry registry = RegistryUtil.getRegistry();
068
069 _messageBus = registry.getService(MessageBus.class);
070
071 initialize();
072 }
073
074 @Override
075 public void destroy() {
076 }
077
078 });
079
080 serviceDependencyManager.registerDependencies(
081 DestinationFactory.class, MessageBus.class);
082 }
083
084 @Override
085 public void connect() {
086 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
087 return;
088 }
089
090 Registry registry = RegistryUtil.getRegistry();
091
092 _messageListenerServiceRegistrar = registry.getServiceRegistrar(
093 MessageListener.class);
094
095 Thread currentThread = Thread.currentThread();
096
097 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
098
099 try {
100 ClassLoader operatingClassLoader = getOperatingClassloader();
101
102 currentThread.setContextClassLoader(operatingClassLoader);
103
104 for (Map.Entry<String, List<MessageListener>> messageListeners :
105 _messageListeners.entrySet()) {
106
107 String destinationName = messageListeners.getKey();
108
109 ServiceDependencyManager serviceDependencyManager =
110 new ServiceDependencyManager();
111
112 serviceDependencyManager.addServiceDependencyListener(
113 new DestinationServiceDependencyListener(
114 destinationName, messageListeners.getValue()));
115
116 Filter filter = registry.getFilter(
117 "(&(destination.name=" + destinationName +
118 ")(objectClass=" + Destination.class.getName() + "))");
119
120 serviceDependencyManager.registerDependencies(filter);
121 }
122 }
123 finally {
124 currentThread.setContextClassLoader(contextClassLoader);
125 }
126 }
127
128 @Override
129 public void destroy() {
130 if (_messageListenerServiceRegistrar != null) {
131 _messageListenerServiceRegistrar.destroy();
132 }
133
134 if (_destinationEventListenerServiceRegistrar != null) {
135 _destinationEventListenerServiceRegistrar.destroy();
136 }
137
138 if (_destinationServiceRegistrar != null) {
139 _destinationServiceRegistrar.destroy();
140 }
141
142 if (_messageBusEventListenerServiceRegistrar != null) {
143 _messageBusEventListenerServiceRegistrar.destroy();
144 }
145
146 _destinationConfigurations.clear();
147 _destinationEventListeners.clear();
148 _messageListeners.clear();
149
150 for (Destination destination : _destinations) {
151 destination.destroy();
152 }
153
154 _destinations.clear();
155 _messageBusEventListeners.clear();
156
157 ClassLoader operatingClassLoader = getOperatingClassloader();
158
159 String servletContextName = ClassLoaderPool.getContextName(
160 operatingClassLoader);
161
162 MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
163 servletContextName, this);
164 }
165
166 @Override
167 public void disconnect() {
168 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
169 return;
170 }
171
172 for (Map.Entry<String, List<MessageListener>> messageListeners :
173 _messageListeners.entrySet()) {
174
175 String destinationName = messageListeners.getKey();
176
177 for (MessageListener messageListener :
178 messageListeners.getValue()) {
179
180 _messageBus.unregisterMessageListener(
181 destinationName, messageListener);
182 }
183 }
184 }
185
186 @Override
187 public void setDestinationConfigurations(
188 Set<DestinationConfiguration> destinationConfigurations) {
189
190 _destinationConfigurations.addAll(destinationConfigurations);
191 }
192
193 @Override
194 public void setDestinationEventListeners(
195 Map<String, List<DestinationEventListener>> destinationEventListeners) {
196
197 _destinationEventListeners.putAll(destinationEventListeners);
198 }
199
200 @Override
201 public void setDestinations(List<Destination> destinations) {
202 _destinations.addAll(destinations);
203 }
204
205 @Override
206 public void setMessageBusEventListeners(
207 List<MessageBusEventListener> messageBusEventListeners) {
208
209 _messageBusEventListeners.addAll(messageBusEventListeners);
210 }
211
212 @Override
213 public void setMessageListeners(
214 Map<String, List<MessageListener>> messageListeners) {
215
216 for (List<MessageListener> messageListenersList :
217 messageListeners.values()) {
218
219 for (MessageListener messageListener : messageListenersList) {
220 Class<?> messageListenerClass = messageListener.getClass();
221
222 try {
223 Method setMessageBusMethod = messageListenerClass.getMethod(
224 "setMessageBus", MessageBus.class);
225
226 setMessageBusMethod.setAccessible(true);
227
228 setMessageBusMethod.invoke(messageListener, _messageBus);
229
230 continue;
231 }
232 catch (Exception e) {
233 }
234
235 try {
236 Method setMessageBusMethod =
237 messageListenerClass.getDeclaredMethod(
238 "setMessageBus", MessageBus.class);
239
240 setMessageBusMethod.setAccessible(true);
241
242 setMessageBusMethod.invoke(messageListener, _messageBus);
243 }
244 catch (Exception e) {
245 }
246 }
247 }
248
249 _messageListeners.putAll(messageListeners);
250 }
251
252
256 @Deprecated
257 @Override
258 public void setReplacementDestinations(
259 List<Destination> replacementDestinations) {
260
261 _destinations.addAll(replacementDestinations);
262 }
263
264 protected abstract ClassLoader getOperatingClassloader();
265
266 protected void initialize() {
267 Thread currentThread = Thread.currentThread();
268
269 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
270
271 ClassLoader operatingClassLoader = getOperatingClassloader();
272
273 if (contextClassLoader == operatingClassLoader) {
274 _portalMessagingConfigurator = true;
275 }
276
277 registerMessageBusEventListeners();
278
279 registerDestinations();
280
281 registerDestinationEventListeners();
282
283 connect();
284
285 String servletContextName = ClassLoaderPool.getContextName(
286 operatingClassLoader);
287
288 if ((servletContextName != null) &&
289 !servletContextName.equals(StringPool.NULL)) {
290
291 MessagingConfiguratorRegistry.registerMessagingConfigurator(
292 servletContextName, this);
293 }
294 }
295
296 protected void registerDestinationEventListeners() {
297 if (_destinationEventListeners.isEmpty()) {
298 return;
299 }
300
301 Registry registry = RegistryUtil.getRegistry();
302
303 _destinationEventListenerServiceRegistrar =
304 registry.getServiceRegistrar(DestinationEventListener.class);
305
306 for (final Map.Entry<String, List<DestinationEventListener>> entry :
307 _destinationEventListeners.entrySet()) {
308
309 final String destinationName = entry.getKey();
310
311 ServiceDependencyManager serviceDependencyManager =
312 new ServiceDependencyManager();
313
314 serviceDependencyManager.addServiceDependencyListener(
315 new ServiceDependencyListener() {
316
317 @Override
318 public void dependenciesFulfilled() {
319 Map<String, Object> properties = new HashMap<>();
320
321 properties.put("destination.name", destinationName);
322
323 for (DestinationEventListener destinationEventListener :
324 entry.getValue()) {
325
326 _destinationEventListenerServiceRegistrar.
327 registerService(
328 DestinationEventListener.class,
329 destinationEventListener, properties);
330 }
331 }
332
333 @Override
334 public void destroy() {
335 }
336
337 });
338
339 Filter filter = registry.getFilter(
340 "(&(destination.name=" + destinationName + ")(objectClass=" +
341 Destination.class.getName() + "))");
342
343 serviceDependencyManager.registerDependencies(filter);
344 }
345 }
346
347 protected void registerDestinations() {
348 for (DestinationConfiguration destinationConfiguration :
349 _destinationConfigurations) {
350
351 try {
352 PortalMessageBusPermission.checkListen(
353 destinationConfiguration.getDestinationName());
354 }
355 catch (SecurityException se) {
356 if (_log.isInfoEnabled()) {
357 _log.info(
358 "Rejecting destination " +
359 destinationConfiguration.getDestinationName());
360 }
361
362 continue;
363 }
364
365 _destinations.add(
366 DestinationFactoryUtil.createDestination(
367 destinationConfiguration));
368 }
369
370 if (_destinations.isEmpty()) {
371 return;
372 }
373
374 Registry registry = RegistryUtil.getRegistry();
375
376 _destinationServiceRegistrar = registry.getServiceRegistrar(
377 Destination.class);
378
379 for (Destination destination : _destinations) {
380 String destinationName = destination.getName();
381
382 try {
383 PortalMessageBusPermission.checkListen(destinationName);
384 }
385 catch (SecurityException se) {
386 if (_log.isInfoEnabled()) {
387 _log.info("Rejecting destination " + destinationName);
388 }
389
390 continue;
391 }
392
393 Map<String, Object> properties = new HashMap<>();
394
395 properties.put("destination.name", destinationName);
396
397 _destinationServiceRegistrar.registerService(
398 Destination.class, destination, properties);
399 }
400 }
401
402 protected void registerMessageBusEventListeners() {
403 if (_messageBusEventListeners.isEmpty()) {
404 return;
405 }
406
407 Registry registry = RegistryUtil.getRegistry();
408
409 _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
410 MessageBusEventListener.class);
411
412 for (MessageBusEventListener messageBusEventListener :
413 _messageBusEventListeners) {
414
415 _messageBusEventListenerServiceRegistrar.registerService(
416 MessageBusEventListener.class, messageBusEventListener);
417 }
418 }
419
420 private static final Log _log = LogFactoryUtil.getLog(
421 AbstractMessagingConfigurator.class);
422
423 private final Set<DestinationConfiguration> _destinationConfigurations =
424 new HashSet<>();
425 private final Map<String, List<DestinationEventListener>>
426 _destinationEventListeners = new HashMap<>();
427 private ServiceRegistrar<DestinationEventListener>
428 _destinationEventListenerServiceRegistrar;
429 private final List<Destination> _destinations = new ArrayList<>();
430 private ServiceRegistrar<Destination> _destinationServiceRegistrar;
431 private volatile MessageBus _messageBus;
432 private final List<MessageBusEventListener> _messageBusEventListeners =
433 new ArrayList<>();
434 private ServiceRegistrar<MessageBusEventListener>
435 _messageBusEventListenerServiceRegistrar;
436 private final Map<String, List<MessageListener>> _messageListeners =
437 new HashMap<>();
438 private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
439 private boolean _portalMessagingConfigurator;
440
441 private class DestinationServiceDependencyListener
442 implements ServiceDependencyListener {
443
444 public DestinationServiceDependencyListener(
445 String destinationName, List<MessageListener> messageListeners) {
446
447 _destinationName = destinationName;
448 _messageListeners = messageListeners;
449 }
450
451 @Override
452 public void dependenciesFulfilled() {
453 ClassLoader operatingClassLoader = getOperatingClassloader();
454
455 if (SPIUtil.isSPI()) {
456 SPI spi = SPIUtil.getSPI();
457
458 try {
459 RegistrationReference registrationReference =
460 spi.getRegistrationReference();
461
462 IntrabandRPCUtil.execute(
463 registrationReference,
464 new DestinationConfigurationProcessCallable(
465 _destinationName));
466 }
467 catch (Exception e) {
468 StringBundler sb = new StringBundler(4);
469
470 sb.append("Unable to install ");
471 sb.append(
472 DestinationConfigurationProcessCallable.class.
473 getName());
474 sb.append(" on MPI for ");
475 sb.append(_destinationName);
476
477 _log.error(sb.toString(), e);
478 }
479 }
480
481 Map<String, Object> properties = new HashMap<>();
482
483 properties.put("destination.name", _destinationName);
484 properties.put(
485 "message.listener.operating.class.loader",
486 operatingClassLoader);
487
488 for (MessageListener messageListener : _messageListeners) {
489 _messageListenerServiceRegistrar.registerService(
490 MessageListener.class, messageListener, properties);
491 }
492 }
493
494 @Override
495 public void destroy() {
496 }
497
498 private final String _destinationName;
499 private final List<MessageListener> _messageListeners;
500
501 }
502
503 }