001
014
015 package com.liferay.portal.kernel.messaging.config;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021 import com.liferay.portal.kernel.messaging.DestinationEventListener;
022 import com.liferay.portal.kernel.messaging.DestinationFactory;
023 import com.liferay.portal.kernel.messaging.MessageBus;
024 import com.liferay.portal.kernel.messaging.MessageBusEventListener;
025 import com.liferay.portal.kernel.messaging.MessageListener;
026 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
027 import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
028 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
029 import com.liferay.portal.kernel.resiliency.spi.SPI;
030 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
032 import com.liferay.portal.kernel.util.ClassLoaderPool;
033 import com.liferay.portal.kernel.util.StringBundler;
034 import com.liferay.registry.Filter;
035 import com.liferay.registry.Registry;
036 import com.liferay.registry.RegistryUtil;
037 import com.liferay.registry.ServiceFinalizer;
038 import com.liferay.registry.ServiceReference;
039 import com.liferay.registry.ServiceRegistrar;
040 import com.liferay.registry.dependency.ServiceDependencyListener;
041 import com.liferay.registry.dependency.ServiceDependencyManager;
042
043 import java.lang.reflect.Method;
044
045 import java.util.ArrayList;
046 import java.util.HashMap;
047 import java.util.HashSet;
048 import java.util.List;
049 import java.util.Map;
050 import java.util.Set;
051
052
055 public abstract class AbstractMessagingConfigurator
056 implements MessagingConfigurator {
057
058 public void afterPropertiesSet() {
059 final ServiceDependencyManager serviceDependencyManager =
060 new ServiceDependencyManager();
061
062 serviceDependencyManager.addServiceDependencyListener(
063 new ServiceDependencyListener() {
064
065 @Override
066 public void dependenciesFulfilled() {
067 Registry registry = RegistryUtil.getRegistry();
068
069 _messageBus = registry.getService(MessageBus.class);
070
071 initialize();
072 }
073
074 @Override
075 public void destroy() {
076 }
077
078 });
079
080 serviceDependencyManager.registerDependencies(
081 DestinationFactory.class, MessageBus.class);
082 }
083
084 @Override
085 public void connect() {
086 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
087 return;
088 }
089
090 Registry registry = RegistryUtil.getRegistry();
091
092 _messageListenerServiceRegistrar = registry.getServiceRegistrar(
093 MessageListener.class);
094
095 Thread currentThread = Thread.currentThread();
096
097 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
098
099 try {
100 ClassLoader operatingClassLoader = getOperatingClassloader();
101
102 currentThread.setContextClassLoader(operatingClassLoader);
103
104 for (Map.Entry<String, List<MessageListener>> messageListeners :
105 _messageListeners.entrySet()) {
106
107 String destinationName = messageListeners.getKey();
108
109 ServiceDependencyManager serviceDependencyManager =
110 new ServiceDependencyManager();
111
112 serviceDependencyManager.addServiceDependencyListener(
113 new DestinationServiceDependencyListener(
114 destinationName, messageListeners.getValue()));
115
116 Filter filter = registry.getFilter(
117 "(&(destination.name=" + destinationName +
118 ")(objectClass=" + Destination.class.getName() + "))");
119
120 serviceDependencyManager.registerDependencies(filter);
121 }
122 }
123 finally {
124 currentThread.setContextClassLoader(contextClassLoader);
125 }
126 }
127
128 @Override
129 public void destroy() {
130 disconnect();
131
132 _messageBusEventListeners.clear();
133
134 if (_messageBusEventListenerServiceRegistrar != null) {
135 _messageBusEventListenerServiceRegistrar.destroy();
136 }
137
138 _destinationConfigurations.clear();
139
140 if (_destinationConfigurationServiceRegistrar != null) {
141 _destinationConfigurationServiceRegistrar.destroy();
142 }
143
144 _destinations.clear();
145
146 if (_destinationServiceRegistrar != null) {
147 _destinationServiceRegistrar.destroy(
148 new ServiceFinalizer<Destination>() {
149
150 @Override
151 public void finalize(
152 ServiceReference<Destination> serviceReference,
153 Destination destination) {
154
155 destination.close();
156
157 destination.removeDestinationEventListeners();
158
159 destination.unregisterMessageListeners();
160 }
161
162 });
163 }
164
165 _messageListeners.clear();
166
167 if (_messageListenerServiceRegistrar != null) {
168 _messageListenerServiceRegistrar.destroy();
169 }
170
171 _destinationEventListeners.clear();
172
173 if (_destinationEventListenerServiceRegistrar != null) {
174 _destinationEventListenerServiceRegistrar.destroy();
175 }
176
177 ClassLoader operatingClassLoader = getOperatingClassloader();
178
179 String servletContextName = ClassLoaderPool.getContextName(
180 operatingClassLoader);
181
182 MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
183 servletContextName, this);
184 }
185
186 @Override
187 public void disconnect() {
188 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
189 return;
190 }
191
192 for (Map.Entry<String, List<MessageListener>> messageListeners :
193 _messageListeners.entrySet()) {
194
195 String destinationName = messageListeners.getKey();
196
197 for (MessageListener messageListener :
198 messageListeners.getValue()) {
199
200 _messageBus.unregisterMessageListener(
201 destinationName, messageListener);
202 }
203 }
204 }
205
206 @Override
207 public void setDestinationConfigurations(
208 Set<DestinationConfiguration> destinationConfigurations) {
209
210 _destinationConfigurations.addAll(destinationConfigurations);
211 }
212
213 @Override
214 public void setDestinationEventListeners(
215 Map<String, List<DestinationEventListener>> destinationEventListeners) {
216
217 _destinationEventListeners.putAll(destinationEventListeners);
218 }
219
220 @Override
221 public void setDestinations(List<Destination> destinations) {
222 _destinations.addAll(destinations);
223 }
224
225 @Override
226 public void setMessageBusEventListeners(
227 List<MessageBusEventListener> messageBusEventListeners) {
228
229 _messageBusEventListeners.addAll(messageBusEventListeners);
230 }
231
232 @Override
233 public void setMessageListeners(
234 Map<String, List<MessageListener>> messageListeners) {
235
236 for (List<MessageListener> messageListenersList :
237 messageListeners.values()) {
238
239 for (MessageListener messageListener : messageListenersList) {
240 Class<?> messageListenerClass = messageListener.getClass();
241
242 try {
243 Method setMessageBusMethod = messageListenerClass.getMethod(
244 "setMessageBus", MessageBus.class);
245
246 setMessageBusMethod.setAccessible(true);
247
248 setMessageBusMethod.invoke(messageListener, _messageBus);
249
250 continue;
251 }
252 catch (Exception e) {
253 }
254
255 try {
256 Method setMessageBusMethod =
257 messageListenerClass.getDeclaredMethod(
258 "setMessageBus", MessageBus.class);
259
260 setMessageBusMethod.setAccessible(true);
261
262 setMessageBusMethod.invoke(messageListener, _messageBus);
263 }
264 catch (Exception e) {
265 }
266 }
267 }
268
269 _messageListeners.putAll(messageListeners);
270 }
271
272
277 @Deprecated
278 @Override
279 public void setReplacementDestinations(
280 List<Destination> replacementDestinations) {
281
282 _destinations.addAll(replacementDestinations);
283 }
284
285 protected abstract ClassLoader getOperatingClassloader();
286
287 protected void initialize() {
288 Thread currentThread = Thread.currentThread();
289
290 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
291
292 ClassLoader operatingClassLoader = getOperatingClassloader();
293
294 if (contextClassLoader == operatingClassLoader) {
295 _portalMessagingConfigurator = true;
296 }
297
298 registerMessageBusEventListeners();
299
300 registerDestinationConfigurations();
301
302 registerDestinations();
303
304 registerDestinationEventListeners();
305
306 connect();
307
308 String servletContextName = ClassLoaderPool.getContextName(
309 operatingClassLoader);
310
311 MessagingConfiguratorRegistry.registerMessagingConfigurator(
312 servletContextName, this);
313 }
314
315 protected void registerDestinationConfigurations() {
316 if (_destinationConfigurations.isEmpty()) {
317 return;
318 }
319
320 Registry registry = RegistryUtil.getRegistry();
321
322 _destinationConfigurationServiceRegistrar =
323 registry.getServiceRegistrar(DestinationConfiguration.class);
324
325 for (DestinationConfiguration destinationConfiguration :
326 _destinationConfigurations) {
327
328 try {
329 PortalMessageBusPermission.checkListen(
330 destinationConfiguration.getDestinationName());
331 }
332 catch (SecurityException se) {
333 if (_log.isInfoEnabled()) {
334 _log.info(
335 "Rejecting destination " +
336 destinationConfiguration.getDestinationName());
337 }
338
339 continue;
340 }
341
342 Map<String, Object> properties = new HashMap<>();
343
344 properties.put(
345 "destination.name",
346 destinationConfiguration.getDestinationName());
347
348 _destinationConfigurationServiceRegistrar.registerService(
349 DestinationConfiguration.class, destinationConfiguration,
350 properties);
351 }
352 }
353
354 protected void registerDestinationEventListeners() {
355 if (_destinationEventListeners.isEmpty()) {
356 return;
357 }
358
359 Registry registry = RegistryUtil.getRegistry();
360
361 _destinationEventListenerServiceRegistrar =
362 registry.getServiceRegistrar(DestinationEventListener.class);
363
364 for (final Map.Entry<String, List<DestinationEventListener>> entry :
365 _destinationEventListeners.entrySet()) {
366
367 final String destinationName = entry.getKey();
368
369 ServiceDependencyManager serviceDependencyManager =
370 new ServiceDependencyManager();
371
372 serviceDependencyManager.addServiceDependencyListener(
373 new ServiceDependencyListener() {
374
375 @Override
376 public void dependenciesFulfilled() {
377 Map<String, Object> properties = new HashMap<>();
378
379 properties.put("destination.name", destinationName);
380
381 for (DestinationEventListener destinationEventListener :
382 entry.getValue()) {
383
384 _destinationEventListenerServiceRegistrar.
385 registerService(
386 DestinationEventListener.class,
387 destinationEventListener, properties);
388 }
389 }
390
391 @Override
392 public void destroy() {
393 }
394
395 });
396
397 Filter filter = registry.getFilter(
398 "(&(destination.name=" + destinationName + ")(objectClass=" +
399 Destination.class.getName() + "))");
400
401 serviceDependencyManager.registerDependencies(filter);
402 }
403 }
404
405 protected void registerDestinations() {
406 if (_destinations.isEmpty()) {
407 return;
408 }
409
410 Registry registry = RegistryUtil.getRegistry();
411
412 _destinationServiceRegistrar = registry.getServiceRegistrar(
413 Destination.class);
414
415 for (Destination destination : _destinations) {
416 String destinationName = destination.getName();
417
418 try {
419 PortalMessageBusPermission.checkListen(destinationName);
420 }
421 catch (SecurityException se) {
422 if (_log.isInfoEnabled()) {
423 _log.info("Rejecting destination " + destinationName);
424 }
425
426 continue;
427 }
428
429 Map<String, Object> properties = new HashMap<>();
430
431 properties.put("destination.name", destinationName);
432
433 _destinationServiceRegistrar.registerService(
434 Destination.class, destination, properties);
435 }
436 }
437
438 protected void registerMessageBusEventListeners() {
439 if (_messageBusEventListeners.isEmpty()) {
440 return;
441 }
442
443 Registry registry = RegistryUtil.getRegistry();
444
445 _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
446 MessageBusEventListener.class);
447
448 for (MessageBusEventListener messageBusEventListener :
449 _messageBusEventListeners) {
450
451 _messageBusEventListenerServiceRegistrar.registerService(
452 MessageBusEventListener.class, messageBusEventListener);
453 }
454 }
455
456 private static final Log _log = LogFactoryUtil.getLog(
457 AbstractMessagingConfigurator.class);
458
459 private final Set<DestinationConfiguration> _destinationConfigurations =
460 new HashSet<>();
461 private ServiceRegistrar<DestinationConfiguration>
462 _destinationConfigurationServiceRegistrar;
463 private final Map<String, List<DestinationEventListener>>
464 _destinationEventListeners = new HashMap<>();
465 private ServiceRegistrar<DestinationEventListener>
466 _destinationEventListenerServiceRegistrar;
467 private final List<Destination> _destinations = new ArrayList<>();
468 private ServiceRegistrar<Destination> _destinationServiceRegistrar;
469 private volatile MessageBus _messageBus;
470 private final List<MessageBusEventListener> _messageBusEventListeners =
471 new ArrayList<>();
472 private ServiceRegistrar<MessageBusEventListener>
473 _messageBusEventListenerServiceRegistrar;
474 private final Map<String, List<MessageListener>> _messageListeners =
475 new HashMap<>();
476 private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
477 private boolean _portalMessagingConfigurator;
478
479 private class DestinationServiceDependencyListener
480 implements ServiceDependencyListener {
481
482 public DestinationServiceDependencyListener(
483 String destinationName, List<MessageListener> messageListeners) {
484
485 _destinationName = destinationName;
486 _messageListeners = messageListeners;
487 }
488
489 @Override
490 public void dependenciesFulfilled() {
491 ClassLoader operatingClassLoader = getOperatingClassloader();
492
493 if (SPIUtil.isSPI()) {
494 SPI spi = SPIUtil.getSPI();
495
496 try {
497 RegistrationReference registrationReference =
498 spi.getRegistrationReference();
499
500 IntrabandRPCUtil.execute(
501 registrationReference,
502 new DestinationConfigurationProcessCallable(
503 _destinationName));
504 }
505 catch (Exception e) {
506 StringBundler sb = new StringBundler(4);
507
508 sb.append("Unable to install ");
509 sb.append(
510 DestinationConfigurationProcessCallable.class.
511 getName());
512 sb.append(" on MPI for ");
513 sb.append(_destinationName);
514
515 _log.error(sb.toString(), e);
516 }
517 }
518
519 Map<String, Object> properties = new HashMap<>();
520
521 properties.put("destination.name", _destinationName);
522 properties.put(
523 "message.listener.operating.class.loader",
524 operatingClassLoader);
525
526 for (MessageListener messageListener : _messageListeners) {
527 _messageListenerServiceRegistrar.registerService(
528 MessageListener.class, messageListener, properties);
529 }
530 }
531
532 @Override
533 public void destroy() {
534 }
535
536 private final String _destinationName;
537 private final List<MessageListener> _messageListeners;
538
539 }
540
541 }