001
014
015 package com.liferay.portal.kernel.messaging.config;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021 import com.liferay.portal.kernel.messaging.DestinationEventListener;
022 import com.liferay.portal.kernel.messaging.DestinationFactory;
023 import com.liferay.portal.kernel.messaging.MessageBus;
024 import com.liferay.portal.kernel.messaging.MessageBusEventListener;
025 import com.liferay.portal.kernel.messaging.MessageListener;
026 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
027 import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
028 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
029 import com.liferay.portal.kernel.resiliency.spi.SPI;
030 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
032 import com.liferay.portal.kernel.util.ClassLoaderPool;
033 import com.liferay.portal.kernel.util.StringBundler;
034 import com.liferay.portal.kernel.util.StringPool;
035 import com.liferay.registry.Filter;
036 import com.liferay.registry.Registry;
037 import com.liferay.registry.RegistryUtil;
038 import com.liferay.registry.ServiceFinalizer;
039 import com.liferay.registry.ServiceReference;
040 import com.liferay.registry.ServiceRegistrar;
041 import com.liferay.registry.dependency.ServiceDependencyListener;
042 import com.liferay.registry.dependency.ServiceDependencyManager;
043
044 import java.lang.reflect.Method;
045
046 import java.util.ArrayList;
047 import java.util.HashMap;
048 import java.util.HashSet;
049 import java.util.List;
050 import java.util.Map;
051 import java.util.Set;
052
053
056 public abstract class AbstractMessagingConfigurator
057 implements MessagingConfigurator {
058
059 public void afterPropertiesSet() {
060 final ServiceDependencyManager serviceDependencyManager =
061 new ServiceDependencyManager();
062
063 serviceDependencyManager.addServiceDependencyListener(
064 new ServiceDependencyListener() {
065
066 @Override
067 public void dependenciesFulfilled() {
068 Registry registry = RegistryUtil.getRegistry();
069
070 _messageBus = registry.getService(MessageBus.class);
071
072 initialize();
073 }
074
075 @Override
076 public void destroy() {
077 }
078
079 });
080
081 serviceDependencyManager.registerDependencies(
082 DestinationFactory.class, MessageBus.class);
083 }
084
085 @Override
086 public void connect() {
087 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
088 return;
089 }
090
091 Registry registry = RegistryUtil.getRegistry();
092
093 _messageListenerServiceRegistrar = registry.getServiceRegistrar(
094 MessageListener.class);
095
096 Thread currentThread = Thread.currentThread();
097
098 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
099
100 try {
101 ClassLoader operatingClassLoader = getOperatingClassloader();
102
103 currentThread.setContextClassLoader(operatingClassLoader);
104
105 for (Map.Entry<String, List<MessageListener>> messageListeners :
106 _messageListeners.entrySet()) {
107
108 String destinationName = messageListeners.getKey();
109
110 ServiceDependencyManager serviceDependencyManager =
111 new ServiceDependencyManager();
112
113 serviceDependencyManager.addServiceDependencyListener(
114 new DestinationServiceDependencyListener(
115 destinationName, messageListeners.getValue()));
116
117 Filter filter = registry.getFilter(
118 "(&(destination.name=" + destinationName +
119 ")(objectClass=" + Destination.class.getName() + "))");
120
121 serviceDependencyManager.registerDependencies(filter);
122 }
123 }
124 finally {
125 currentThread.setContextClassLoader(contextClassLoader);
126 }
127 }
128
129 @Override
130 public void destroy() {
131 disconnect();
132
133 _messageBusEventListeners.clear();
134
135 if (_messageBusEventListenerServiceRegistrar != null) {
136 _messageBusEventListenerServiceRegistrar.destroy();
137 }
138
139 _destinationConfigurations.clear();
140
141 if (_destinationConfigurationServiceRegistrar != null) {
142 _destinationConfigurationServiceRegistrar.destroy();
143 }
144
145 _destinations.clear();
146
147 if (_destinationServiceRegistrar != null) {
148 _destinationServiceRegistrar.destroy(
149 new ServiceFinalizer<Destination>() {
150
151 @Override
152 public void finalize(
153 ServiceReference<Destination> serviceReference,
154 Destination destination) {
155
156 destination.close();
157
158 destination.removeDestinationEventListeners();
159
160 destination.unregisterMessageListeners();
161 }
162
163 });
164 }
165
166 _messageListeners.clear();
167
168 if (_messageListenerServiceRegistrar != null) {
169 _messageListenerServiceRegistrar.destroy();
170 }
171
172 _destinationEventListeners.clear();
173
174 if (_destinationEventListenerServiceRegistrar != null) {
175 _destinationEventListenerServiceRegistrar.destroy();
176 }
177
178 ClassLoader operatingClassLoader = getOperatingClassloader();
179
180 String servletContextName = ClassLoaderPool.getContextName(
181 operatingClassLoader);
182
183 MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
184 servletContextName, this);
185 }
186
187 @Override
188 public void disconnect() {
189 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
190 return;
191 }
192
193 for (Map.Entry<String, List<MessageListener>> messageListeners :
194 _messageListeners.entrySet()) {
195
196 String destinationName = messageListeners.getKey();
197
198 for (MessageListener messageListener :
199 messageListeners.getValue()) {
200
201 _messageBus.unregisterMessageListener(
202 destinationName, messageListener);
203 }
204 }
205 }
206
207 @Override
208 public void setDestinationConfigurations(
209 Set<DestinationConfiguration> destinationConfigurations) {
210
211 _destinationConfigurations.addAll(destinationConfigurations);
212 }
213
214 @Override
215 public void setDestinationEventListeners(
216 Map<String, List<DestinationEventListener>> destinationEventListeners) {
217
218 _destinationEventListeners.putAll(destinationEventListeners);
219 }
220
221 @Override
222 public void setDestinations(List<Destination> destinations) {
223 _destinations.addAll(destinations);
224 }
225
226 @Override
227 public void setMessageBusEventListeners(
228 List<MessageBusEventListener> messageBusEventListeners) {
229
230 _messageBusEventListeners.addAll(messageBusEventListeners);
231 }
232
233 @Override
234 public void setMessageListeners(
235 Map<String, List<MessageListener>> messageListeners) {
236
237 for (List<MessageListener> messageListenersList :
238 messageListeners.values()) {
239
240 for (MessageListener messageListener : messageListenersList) {
241 Class<?> messageListenerClass = messageListener.getClass();
242
243 try {
244 Method setMessageBusMethod = messageListenerClass.getMethod(
245 "setMessageBus", MessageBus.class);
246
247 setMessageBusMethod.setAccessible(true);
248
249 setMessageBusMethod.invoke(messageListener, _messageBus);
250
251 continue;
252 }
253 catch (Exception e) {
254 }
255
256 try {
257 Method setMessageBusMethod =
258 messageListenerClass.getDeclaredMethod(
259 "setMessageBus", MessageBus.class);
260
261 setMessageBusMethod.setAccessible(true);
262
263 setMessageBusMethod.invoke(messageListener, _messageBus);
264 }
265 catch (Exception e) {
266 }
267 }
268 }
269
270 _messageListeners.putAll(messageListeners);
271 }
272
273
277 @Deprecated
278 @Override
279 public void setReplacementDestinations(
280 List<Destination> replacementDestinations) {
281
282 _destinations.addAll(replacementDestinations);
283 }
284
285 protected abstract ClassLoader getOperatingClassloader();
286
287 protected void initialize() {
288 Thread currentThread = Thread.currentThread();
289
290 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
291
292 ClassLoader operatingClassLoader = getOperatingClassloader();
293
294 if (contextClassLoader == operatingClassLoader) {
295 _portalMessagingConfigurator = true;
296 }
297
298 registerMessageBusEventListeners();
299
300 registerDestinationConfigurations();
301
302 registerDestinations();
303
304 registerDestinationEventListeners();
305
306 connect();
307
308 String servletContextName = ClassLoaderPool.getContextName(
309 operatingClassLoader);
310
311 if ((servletContextName != null) &&
312 !servletContextName.equals(StringPool.NULL)) {
313
314 MessagingConfiguratorRegistry.registerMessagingConfigurator(
315 servletContextName, this);
316 }
317 }
318
319 protected void registerDestinationConfigurations() {
320 if (_destinationConfigurations.isEmpty()) {
321 return;
322 }
323
324 Registry registry = RegistryUtil.getRegistry();
325
326 _destinationConfigurationServiceRegistrar =
327 registry.getServiceRegistrar(DestinationConfiguration.class);
328
329 for (DestinationConfiguration destinationConfiguration :
330 _destinationConfigurations) {
331
332 try {
333 PortalMessageBusPermission.checkListen(
334 destinationConfiguration.getDestinationName());
335 }
336 catch (SecurityException se) {
337 if (_log.isInfoEnabled()) {
338 _log.info(
339 "Rejecting destination " +
340 destinationConfiguration.getDestinationName());
341 }
342
343 continue;
344 }
345
346 Map<String, Object> properties = new HashMap<>();
347
348 properties.put(
349 "destination.name",
350 destinationConfiguration.getDestinationName());
351
352 _destinationConfigurationServiceRegistrar.registerService(
353 DestinationConfiguration.class, destinationConfiguration,
354 properties);
355 }
356 }
357
358 protected void registerDestinationEventListeners() {
359 if (_destinationEventListeners.isEmpty()) {
360 return;
361 }
362
363 Registry registry = RegistryUtil.getRegistry();
364
365 _destinationEventListenerServiceRegistrar =
366 registry.getServiceRegistrar(DestinationEventListener.class);
367
368 for (final Map.Entry<String, List<DestinationEventListener>> entry :
369 _destinationEventListeners.entrySet()) {
370
371 final String destinationName = entry.getKey();
372
373 ServiceDependencyManager serviceDependencyManager =
374 new ServiceDependencyManager();
375
376 serviceDependencyManager.addServiceDependencyListener(
377 new ServiceDependencyListener() {
378
379 @Override
380 public void dependenciesFulfilled() {
381 Map<String, Object> properties = new HashMap<>();
382
383 properties.put("destination.name", destinationName);
384
385 for (DestinationEventListener destinationEventListener :
386 entry.getValue()) {
387
388 _destinationEventListenerServiceRegistrar.
389 registerService(
390 DestinationEventListener.class,
391 destinationEventListener, properties);
392 }
393 }
394
395 @Override
396 public void destroy() {
397 }
398
399 });
400
401 Filter filter = registry.getFilter(
402 "(&(destination.name=" + destinationName + ")(objectClass=" +
403 Destination.class.getName() + "))");
404
405 serviceDependencyManager.registerDependencies(filter);
406 }
407 }
408
409 protected void registerDestinations() {
410 if (_destinations.isEmpty()) {
411 return;
412 }
413
414 Registry registry = RegistryUtil.getRegistry();
415
416 _destinationServiceRegistrar = registry.getServiceRegistrar(
417 Destination.class);
418
419 for (Destination destination : _destinations) {
420 String destinationName = destination.getName();
421
422 try {
423 PortalMessageBusPermission.checkListen(destinationName);
424 }
425 catch (SecurityException se) {
426 if (_log.isInfoEnabled()) {
427 _log.info("Rejecting destination " + destinationName);
428 }
429
430 continue;
431 }
432
433 Map<String, Object> properties = new HashMap<>();
434
435 properties.put("destination.name", destinationName);
436
437 _destinationServiceRegistrar.registerService(
438 Destination.class, destination, properties);
439 }
440 }
441
442 protected void registerMessageBusEventListeners() {
443 if (_messageBusEventListeners.isEmpty()) {
444 return;
445 }
446
447 Registry registry = RegistryUtil.getRegistry();
448
449 _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
450 MessageBusEventListener.class);
451
452 for (MessageBusEventListener messageBusEventListener :
453 _messageBusEventListeners) {
454
455 _messageBusEventListenerServiceRegistrar.registerService(
456 MessageBusEventListener.class, messageBusEventListener);
457 }
458 }
459
460 private static final Log _log = LogFactoryUtil.getLog(
461 AbstractMessagingConfigurator.class);
462
463 private final Set<DestinationConfiguration> _destinationConfigurations =
464 new HashSet<>();
465 private ServiceRegistrar<DestinationConfiguration>
466 _destinationConfigurationServiceRegistrar;
467 private final Map<String, List<DestinationEventListener>>
468 _destinationEventListeners = new HashMap<>();
469 private ServiceRegistrar<DestinationEventListener>
470 _destinationEventListenerServiceRegistrar;
471 private final List<Destination> _destinations = new ArrayList<>();
472 private ServiceRegistrar<Destination> _destinationServiceRegistrar;
473 private volatile MessageBus _messageBus;
474 private final List<MessageBusEventListener> _messageBusEventListeners =
475 new ArrayList<>();
476 private ServiceRegistrar<MessageBusEventListener>
477 _messageBusEventListenerServiceRegistrar;
478 private final Map<String, List<MessageListener>> _messageListeners =
479 new HashMap<>();
480 private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
481 private boolean _portalMessagingConfigurator;
482
483 private class DestinationServiceDependencyListener
484 implements ServiceDependencyListener {
485
486 public DestinationServiceDependencyListener(
487 String destinationName, List<MessageListener> messageListeners) {
488
489 _destinationName = destinationName;
490 _messageListeners = messageListeners;
491 }
492
493 @Override
494 public void dependenciesFulfilled() {
495 ClassLoader operatingClassLoader = getOperatingClassloader();
496
497 if (SPIUtil.isSPI()) {
498 SPI spi = SPIUtil.getSPI();
499
500 try {
501 RegistrationReference registrationReference =
502 spi.getRegistrationReference();
503
504 IntrabandRPCUtil.execute(
505 registrationReference,
506 new DestinationConfigurationProcessCallable(
507 _destinationName));
508 }
509 catch (Exception e) {
510 StringBundler sb = new StringBundler(4);
511
512 sb.append("Unable to install ");
513 sb.append(
514 DestinationConfigurationProcessCallable.class.
515 getName());
516 sb.append(" on MPI for ");
517 sb.append(_destinationName);
518
519 _log.error(sb.toString(), e);
520 }
521 }
522
523 Map<String, Object> properties = new HashMap<>();
524
525 properties.put("destination.name", _destinationName);
526 properties.put(
527 "message.listener.operating.class.loader",
528 operatingClassLoader);
529
530 for (MessageListener messageListener : _messageListeners) {
531 _messageListenerServiceRegistrar.registerService(
532 MessageListener.class, messageListener, properties);
533 }
534 }
535
536 @Override
537 public void destroy() {
538 }
539
540 private final String _destinationName;
541 private final List<MessageListener> _messageListeners;
542
543 }
544
545 }