001
014
015 package com.liferay.portal.kernel.messaging.config;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021 import com.liferay.portal.kernel.messaging.DestinationEventListener;
022 import com.liferay.portal.kernel.messaging.DestinationFactory;
023 import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
024 import com.liferay.portal.kernel.messaging.MessageBus;
025 import com.liferay.portal.kernel.messaging.MessageBusEventListener;
026 import com.liferay.portal.kernel.messaging.MessageListener;
027 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
028 import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
029 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
030 import com.liferay.portal.kernel.resiliency.spi.SPI;
031 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
032 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
033 import com.liferay.portal.kernel.util.ClassLoaderPool;
034 import com.liferay.portal.kernel.util.StringBundler;
035 import com.liferay.portal.kernel.util.StringPool;
036 import com.liferay.registry.Filter;
037 import com.liferay.registry.Registry;
038 import com.liferay.registry.RegistryUtil;
039 import com.liferay.registry.ServiceFinalizer;
040 import com.liferay.registry.ServiceReference;
041 import com.liferay.registry.ServiceRegistrar;
042 import com.liferay.registry.dependency.ServiceDependencyListener;
043 import com.liferay.registry.dependency.ServiceDependencyManager;
044
045 import java.lang.reflect.Method;
046
047 import java.util.ArrayList;
048 import java.util.HashMap;
049 import java.util.HashSet;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Set;
053
054
057 public abstract class AbstractMessagingConfigurator
058 implements MessagingConfigurator {
059
060 public void afterPropertiesSet() {
061 final ServiceDependencyManager serviceDependencyManager =
062 new ServiceDependencyManager();
063
064 serviceDependencyManager.addServiceDependencyListener(
065 new ServiceDependencyListener() {
066
067 @Override
068 public void dependenciesFulfilled() {
069 Registry registry = RegistryUtil.getRegistry();
070
071 _messageBus = registry.getService(MessageBus.class);
072
073 initialize();
074 }
075
076 @Override
077 public void destroy() {
078 }
079
080 });
081
082 serviceDependencyManager.registerDependencies(
083 DestinationFactory.class, MessageBus.class);
084 }
085
086 @Override
087 public void connect() {
088 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
089 return;
090 }
091
092 Registry registry = RegistryUtil.getRegistry();
093
094 _messageListenerServiceRegistrar = registry.getServiceRegistrar(
095 MessageListener.class);
096
097 Thread currentThread = Thread.currentThread();
098
099 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
100
101 try {
102 ClassLoader operatingClassLoader = getOperatingClassloader();
103
104 currentThread.setContextClassLoader(operatingClassLoader);
105
106 for (Map.Entry<String, List<MessageListener>> messageListeners :
107 _messageListeners.entrySet()) {
108
109 String destinationName = messageListeners.getKey();
110
111 ServiceDependencyManager serviceDependencyManager =
112 new ServiceDependencyManager();
113
114 serviceDependencyManager.addServiceDependencyListener(
115 new DestinationServiceDependencyListener(
116 destinationName, messageListeners.getValue()));
117
118 Filter filter = registry.getFilter(
119 "(&(destination.name=" + destinationName +
120 ")(objectClass=" + Destination.class.getName() + "))");
121
122 serviceDependencyManager.registerDependencies(filter);
123 }
124 }
125 finally {
126 currentThread.setContextClassLoader(contextClassLoader);
127 }
128 }
129
130 @Override
131 public void destroy() {
132 disconnect();
133
134 _messageBusEventListeners.clear();
135
136 if (_messageBusEventListenerServiceRegistrar != null) {
137 _messageBusEventListenerServiceRegistrar.destroy();
138 }
139
140 _destinationConfigurations.clear();
141
142 _destinations.clear();
143
144 if (_destinationServiceRegistrar != null) {
145 _destinationServiceRegistrar.destroy(
146 new ServiceFinalizer<Destination>() {
147
148 @Override
149 public void finalize(
150 ServiceReference<Destination> serviceReference,
151 Destination destination) {
152
153 destination.close();
154
155 destination.removeDestinationEventListeners();
156
157 destination.unregisterMessageListeners();
158 }
159
160 });
161 }
162
163 _messageListeners.clear();
164
165 if (_messageListenerServiceRegistrar != null) {
166 _messageListenerServiceRegistrar.destroy();
167 }
168
169 _destinationEventListeners.clear();
170
171 if (_destinationEventListenerServiceRegistrar != null) {
172 _destinationEventListenerServiceRegistrar.destroy();
173 }
174
175 ClassLoader operatingClassLoader = getOperatingClassloader();
176
177 String servletContextName = ClassLoaderPool.getContextName(
178 operatingClassLoader);
179
180 MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
181 servletContextName, this);
182 }
183
184 @Override
185 public void disconnect() {
186 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
187 return;
188 }
189
190 for (Map.Entry<String, List<MessageListener>> messageListeners :
191 _messageListeners.entrySet()) {
192
193 String destinationName = messageListeners.getKey();
194
195 for (MessageListener messageListener :
196 messageListeners.getValue()) {
197
198 _messageBus.unregisterMessageListener(
199 destinationName, messageListener);
200 }
201 }
202 }
203
204 @Override
205 public void setDestinationConfigurations(
206 Set<DestinationConfiguration> destinationConfigurations) {
207
208 _destinationConfigurations.addAll(destinationConfigurations);
209 }
210
211 @Override
212 public void setDestinationEventListeners(
213 Map<String, List<DestinationEventListener>> destinationEventListeners) {
214
215 _destinationEventListeners.putAll(destinationEventListeners);
216 }
217
218 @Override
219 public void setDestinations(List<Destination> destinations) {
220 _destinations.addAll(destinations);
221 }
222
223 @Override
224 public void setMessageBusEventListeners(
225 List<MessageBusEventListener> messageBusEventListeners) {
226
227 _messageBusEventListeners.addAll(messageBusEventListeners);
228 }
229
230 @Override
231 public void setMessageListeners(
232 Map<String, List<MessageListener>> messageListeners) {
233
234 for (List<MessageListener> messageListenersList :
235 messageListeners.values()) {
236
237 for (MessageListener messageListener : messageListenersList) {
238 Class<?> messageListenerClass = messageListener.getClass();
239
240 try {
241 Method setMessageBusMethod = messageListenerClass.getMethod(
242 "setMessageBus", MessageBus.class);
243
244 setMessageBusMethod.setAccessible(true);
245
246 setMessageBusMethod.invoke(messageListener, _messageBus);
247
248 continue;
249 }
250 catch (Exception e) {
251 }
252
253 try {
254 Method setMessageBusMethod =
255 messageListenerClass.getDeclaredMethod(
256 "setMessageBus", MessageBus.class);
257
258 setMessageBusMethod.setAccessible(true);
259
260 setMessageBusMethod.invoke(messageListener, _messageBus);
261 }
262 catch (Exception e) {
263 }
264 }
265 }
266
267 _messageListeners.putAll(messageListeners);
268 }
269
270
274 @Deprecated
275 @Override
276 public void setReplacementDestinations(
277 List<Destination> replacementDestinations) {
278
279 _destinations.addAll(replacementDestinations);
280 }
281
282 protected abstract ClassLoader getOperatingClassloader();
283
284 protected void initialize() {
285 Thread currentThread = Thread.currentThread();
286
287 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
288
289 ClassLoader operatingClassLoader = getOperatingClassloader();
290
291 if (contextClassLoader == operatingClassLoader) {
292 _portalMessagingConfigurator = true;
293 }
294
295 registerMessageBusEventListeners();
296
297 registerDestinations();
298
299 registerDestinationEventListeners();
300
301 connect();
302
303 String servletContextName = ClassLoaderPool.getContextName(
304 operatingClassLoader);
305
306 if ((servletContextName != null) &&
307 !servletContextName.equals(StringPool.NULL)) {
308
309 MessagingConfiguratorRegistry.registerMessagingConfigurator(
310 servletContextName, this);
311 }
312 }
313
314 protected void registerDestinationEventListeners() {
315 if (_destinationEventListeners.isEmpty()) {
316 return;
317 }
318
319 Registry registry = RegistryUtil.getRegistry();
320
321 _destinationEventListenerServiceRegistrar =
322 registry.getServiceRegistrar(DestinationEventListener.class);
323
324 for (final Map.Entry<String, List<DestinationEventListener>> entry :
325 _destinationEventListeners.entrySet()) {
326
327 final String destinationName = entry.getKey();
328
329 ServiceDependencyManager serviceDependencyManager =
330 new ServiceDependencyManager();
331
332 serviceDependencyManager.addServiceDependencyListener(
333 new ServiceDependencyListener() {
334
335 @Override
336 public void dependenciesFulfilled() {
337 Map<String, Object> properties = new HashMap<>();
338
339 properties.put("destination.name", destinationName);
340
341 for (DestinationEventListener destinationEventListener :
342 entry.getValue()) {
343
344 _destinationEventListenerServiceRegistrar.
345 registerService(
346 DestinationEventListener.class,
347 destinationEventListener, properties);
348 }
349 }
350
351 @Override
352 public void destroy() {
353 }
354
355 });
356
357 Filter filter = registry.getFilter(
358 "(&(destination.name=" + destinationName + ")(objectClass=" +
359 Destination.class.getName() + "))");
360
361 serviceDependencyManager.registerDependencies(filter);
362 }
363 }
364
365 protected void registerDestinations() {
366 for (DestinationConfiguration destinationConfiguration :
367 _destinationConfigurations) {
368
369 try {
370 PortalMessageBusPermission.checkListen(
371 destinationConfiguration.getDestinationName());
372 }
373 catch (SecurityException se) {
374 if (_log.isInfoEnabled()) {
375 _log.info(
376 "Rejecting destination " +
377 destinationConfiguration.getDestinationName());
378 }
379
380 continue;
381 }
382
383 _destinations.add(
384 DestinationFactoryUtil.createDestination(
385 destinationConfiguration));
386 }
387
388 if (_destinations.isEmpty()) {
389 return;
390 }
391
392 Registry registry = RegistryUtil.getRegistry();
393
394 _destinationServiceRegistrar = registry.getServiceRegistrar(
395 Destination.class);
396
397 for (Destination destination : _destinations) {
398 String destinationName = destination.getName();
399
400 try {
401 PortalMessageBusPermission.checkListen(destinationName);
402 }
403 catch (SecurityException se) {
404 if (_log.isInfoEnabled()) {
405 _log.info("Rejecting destination " + destinationName);
406 }
407
408 continue;
409 }
410
411 Map<String, Object> properties = new HashMap<>();
412
413 properties.put("destination.name", destinationName);
414
415 _destinationServiceRegistrar.registerService(
416 Destination.class, destination, properties);
417 }
418 }
419
420 protected void registerMessageBusEventListeners() {
421 if (_messageBusEventListeners.isEmpty()) {
422 return;
423 }
424
425 Registry registry = RegistryUtil.getRegistry();
426
427 _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
428 MessageBusEventListener.class);
429
430 for (MessageBusEventListener messageBusEventListener :
431 _messageBusEventListeners) {
432
433 _messageBusEventListenerServiceRegistrar.registerService(
434 MessageBusEventListener.class, messageBusEventListener);
435 }
436 }
437
438 private static final Log _log = LogFactoryUtil.getLog(
439 AbstractMessagingConfigurator.class);
440
441 private final Set<DestinationConfiguration> _destinationConfigurations =
442 new HashSet<>();
443 private final Map<String, List<DestinationEventListener>>
444 _destinationEventListeners = new HashMap<>();
445 private ServiceRegistrar<DestinationEventListener>
446 _destinationEventListenerServiceRegistrar;
447 private final List<Destination> _destinations = new ArrayList<>();
448 private ServiceRegistrar<Destination> _destinationServiceRegistrar;
449 private volatile MessageBus _messageBus;
450 private final List<MessageBusEventListener> _messageBusEventListeners =
451 new ArrayList<>();
452 private ServiceRegistrar<MessageBusEventListener>
453 _messageBusEventListenerServiceRegistrar;
454 private final Map<String, List<MessageListener>> _messageListeners =
455 new HashMap<>();
456 private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
457 private boolean _portalMessagingConfigurator;
458
459 private class DestinationServiceDependencyListener
460 implements ServiceDependencyListener {
461
462 public DestinationServiceDependencyListener(
463 String destinationName, List<MessageListener> messageListeners) {
464
465 _destinationName = destinationName;
466 _messageListeners = messageListeners;
467 }
468
469 @Override
470 public void dependenciesFulfilled() {
471 ClassLoader operatingClassLoader = getOperatingClassloader();
472
473 if (SPIUtil.isSPI()) {
474 SPI spi = SPIUtil.getSPI();
475
476 try {
477 RegistrationReference registrationReference =
478 spi.getRegistrationReference();
479
480 IntrabandRPCUtil.execute(
481 registrationReference,
482 new DestinationConfigurationProcessCallable(
483 _destinationName));
484 }
485 catch (Exception e) {
486 StringBundler sb = new StringBundler(4);
487
488 sb.append("Unable to install ");
489 sb.append(
490 DestinationConfigurationProcessCallable.class.
491 getName());
492 sb.append(" on MPI for ");
493 sb.append(_destinationName);
494
495 _log.error(sb.toString(), e);
496 }
497 }
498
499 Map<String, Object> properties = new HashMap<>();
500
501 properties.put("destination.name", _destinationName);
502 properties.put(
503 "message.listener.operating.class.loader",
504 operatingClassLoader);
505
506 for (MessageListener messageListener : _messageListeners) {
507 _messageListenerServiceRegistrar.registerService(
508 MessageListener.class, messageListener, properties);
509 }
510 }
511
512 @Override
513 public void destroy() {
514 }
515
516 private final String _destinationName;
517 private final List<MessageListener> _messageListeners;
518
519 }
520
521 }