001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.config;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.Destination;
020    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021    import com.liferay.portal.kernel.messaging.DestinationEventListener;
022    import com.liferay.portal.kernel.messaging.DestinationFactory;
023    import com.liferay.portal.kernel.messaging.MessageBus;
024    import com.liferay.portal.kernel.messaging.MessageBusEventListener;
025    import com.liferay.portal.kernel.messaging.MessageListener;
026    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
027    import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
028    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
029    import com.liferay.portal.kernel.resiliency.spi.SPI;
030    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
032    import com.liferay.portal.kernel.util.ClassLoaderPool;
033    import com.liferay.portal.kernel.util.StringBundler;
034    import com.liferay.portal.kernel.util.StringPool;
035    import com.liferay.registry.Filter;
036    import com.liferay.registry.Registry;
037    import com.liferay.registry.RegistryUtil;
038    import com.liferay.registry.ServiceFinalizer;
039    import com.liferay.registry.ServiceReference;
040    import com.liferay.registry.ServiceRegistrar;
041    import com.liferay.registry.dependency.ServiceDependencyListener;
042    import com.liferay.registry.dependency.ServiceDependencyManager;
043    
044    import java.lang.reflect.Method;
045    
046    import java.util.ArrayList;
047    import java.util.HashMap;
048    import java.util.HashSet;
049    import java.util.List;
050    import java.util.Map;
051    import java.util.Set;
052    
053    /**
054     * @author Michael C. Han
055     */
056    public abstract class AbstractMessagingConfigurator
057            implements MessagingConfigurator {
058    
059            public void afterPropertiesSet() {
060                    final ServiceDependencyManager serviceDependencyManager =
061                            new ServiceDependencyManager();
062    
063                    serviceDependencyManager.addServiceDependencyListener(
064                            new ServiceDependencyListener() {
065    
066                                    @Override
067                                    public void dependenciesFulfilled() {
068                                            Registry registry = RegistryUtil.getRegistry();
069    
070                                            _messageBus = registry.getService(MessageBus.class);
071    
072                                            initialize();
073                                    }
074    
075                                    @Override
076                                    public void destroy() {
077                                    }
078    
079                            });
080    
081                    serviceDependencyManager.registerDependencies(
082                            DestinationFactory.class, MessageBus.class);
083            }
084    
085            @Override
086            public void connect() {
087                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
088                            return;
089                    }
090    
091                    Registry registry = RegistryUtil.getRegistry();
092    
093                    _messageListenerServiceRegistrar = registry.getServiceRegistrar(
094                            MessageListener.class);
095    
096                    Thread currentThread = Thread.currentThread();
097    
098                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
099    
100                    try {
101                            ClassLoader operatingClassLoader = getOperatingClassloader();
102    
103                            currentThread.setContextClassLoader(operatingClassLoader);
104    
105                            for (Map.Entry<String, List<MessageListener>> messageListeners :
106                                            _messageListeners.entrySet()) {
107    
108                                    String destinationName = messageListeners.getKey();
109    
110                                    ServiceDependencyManager serviceDependencyManager =
111                                            new ServiceDependencyManager();
112    
113                                    serviceDependencyManager.addServiceDependencyListener(
114                                            new DestinationServiceDependencyListener(
115                                                    destinationName, messageListeners.getValue()));
116    
117                                    Filter filter = registry.getFilter(
118                                            "(&(destination.name=" + destinationName +
119                                                    ")(objectClass=" + Destination.class.getName() + "))");
120    
121                                    serviceDependencyManager.registerDependencies(filter);
122                            }
123                    }
124                    finally {
125                            currentThread.setContextClassLoader(contextClassLoader);
126                    }
127            }
128    
129            @Override
130            public void destroy() {
131                    disconnect();
132    
133                    _messageBusEventListeners.clear();
134    
135                    if (_messageBusEventListenerServiceRegistrar != null) {
136                            _messageBusEventListenerServiceRegistrar.destroy();
137                    }
138    
139                    _destinationConfigurations.clear();
140    
141                    if (_destinationConfigurationServiceRegistrar != null) {
142                            _destinationConfigurationServiceRegistrar.destroy();
143                    }
144    
145                    _destinations.clear();
146    
147                    if (_destinationServiceRegistrar != null) {
148                            _destinationServiceRegistrar.destroy(
149                                    new ServiceFinalizer<Destination>() {
150    
151                                            @Override
152                                            public void finalize(
153                                                    ServiceReference<Destination> serviceReference,
154                                                    Destination destination) {
155    
156                                                    destination.close();
157    
158                                                    destination.removeDestinationEventListeners();
159    
160                                                    destination.unregisterMessageListeners();
161                                            }
162    
163                                    });
164                    }
165    
166                    _messageListeners.clear();
167    
168                    if (_messageListenerServiceRegistrar != null) {
169                            _messageListenerServiceRegistrar.destroy();
170                    }
171    
172                    _destinationEventListeners.clear();
173    
174                    if (_destinationEventListenerServiceRegistrar != null) {
175                            _destinationEventListenerServiceRegistrar.destroy();
176                    }
177    
178                    ClassLoader operatingClassLoader = getOperatingClassloader();
179    
180                    String servletContextName = ClassLoaderPool.getContextName(
181                            operatingClassLoader);
182    
183                    MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
184                            servletContextName, this);
185            }
186    
187            @Override
188            public void disconnect() {
189                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
190                            return;
191                    }
192    
193                    for (Map.Entry<String, List<MessageListener>> messageListeners :
194                                    _messageListeners.entrySet()) {
195    
196                            String destinationName = messageListeners.getKey();
197    
198                            for (MessageListener messageListener :
199                                            messageListeners.getValue()) {
200    
201                                    _messageBus.unregisterMessageListener(
202                                            destinationName, messageListener);
203                            }
204                    }
205            }
206    
207            @Override
208            public void setDestinationConfigurations(
209                    Set<DestinationConfiguration> destinationConfigurations) {
210    
211                    _destinationConfigurations.addAll(destinationConfigurations);
212            }
213    
214            @Override
215            public void setDestinationEventListeners(
216                    Map<String, List<DestinationEventListener>> destinationEventListeners) {
217    
218                    _destinationEventListeners.putAll(destinationEventListeners);
219            }
220    
221            @Override
222            public void setDestinations(List<Destination> destinations) {
223                    _destinations.addAll(destinations);
224            }
225    
226            @Override
227            public void setMessageBusEventListeners(
228                    List<MessageBusEventListener> messageBusEventListeners) {
229    
230                    _messageBusEventListeners.addAll(messageBusEventListeners);
231            }
232    
233            @Override
234            public void setMessageListeners(
235                    Map<String, List<MessageListener>> messageListeners) {
236    
237                    for (List<MessageListener> messageListenersList :
238                                    messageListeners.values()) {
239    
240                            for (MessageListener messageListener : messageListenersList) {
241                                    Class<?> messageListenerClass = messageListener.getClass();
242    
243                                    try {
244                                            Method setMessageBusMethod = messageListenerClass.getMethod(
245                                                    "setMessageBus", MessageBus.class);
246    
247                                            setMessageBusMethod.setAccessible(true);
248    
249                                            setMessageBusMethod.invoke(messageListener, _messageBus);
250    
251                                            continue;
252                                    }
253                                    catch (Exception e) {
254                                    }
255    
256                                    try {
257                                            Method setMessageBusMethod =
258                                                    messageListenerClass.getDeclaredMethod(
259                                                            "setMessageBus", MessageBus.class);
260    
261                                            setMessageBusMethod.setAccessible(true);
262    
263                                            setMessageBusMethod.invoke(messageListener, _messageBus);
264                                    }
265                                    catch (Exception e) {
266                                    }
267                            }
268                    }
269    
270                    _messageListeners.putAll(messageListeners);
271            }
272    
273            /**
274             * @param      replacementDestinations
275             * @deprecated As of 7.0.0, replaced by {@link #setDestinations(List)}
276             */
277            @Deprecated
278            @Override
279            public void setReplacementDestinations(
280                    List<Destination> replacementDestinations) {
281    
282                    _destinations.addAll(replacementDestinations);
283            }
284    
285            protected abstract ClassLoader getOperatingClassloader();
286    
287            protected void initialize() {
288                    Thread currentThread = Thread.currentThread();
289    
290                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
291    
292                    ClassLoader operatingClassLoader = getOperatingClassloader();
293    
294                    if (contextClassLoader == operatingClassLoader) {
295                            _portalMessagingConfigurator = true;
296                    }
297    
298                    registerMessageBusEventListeners();
299    
300                    registerDestinationConfigurations();
301    
302                    registerDestinations();
303    
304                    registerDestinationEventListeners();
305    
306                    connect();
307    
308                    String servletContextName = ClassLoaderPool.getContextName(
309                            operatingClassLoader);
310    
311                    if ((servletContextName != null) &&
312                            !servletContextName.equals(StringPool.NULL)) {
313    
314                            MessagingConfiguratorRegistry.registerMessagingConfigurator(
315                                    servletContextName, this);
316                    }
317            }
318    
319            protected void registerDestinationConfigurations() {
320                    if (_destinationConfigurations.isEmpty()) {
321                            return;
322                    }
323    
324                    Registry registry = RegistryUtil.getRegistry();
325    
326                    _destinationConfigurationServiceRegistrar =
327                            registry.getServiceRegistrar(DestinationConfiguration.class);
328    
329                    for (DestinationConfiguration destinationConfiguration :
330                                    _destinationConfigurations) {
331    
332                            try {
333                                    PortalMessageBusPermission.checkListen(
334                                            destinationConfiguration.getDestinationName());
335                            }
336                            catch (SecurityException se) {
337                                    if (_log.isInfoEnabled()) {
338                                            _log.info(
339                                                    "Rejecting destination " +
340                                                            destinationConfiguration.getDestinationName());
341                                    }
342    
343                                    continue;
344                            }
345    
346                            Map<String, Object> properties = new HashMap<>();
347    
348                            properties.put(
349                                    "destination.name",
350                                    destinationConfiguration.getDestinationName());
351    
352                            _destinationConfigurationServiceRegistrar.registerService(
353                                    DestinationConfiguration.class, destinationConfiguration,
354                                    properties);
355                    }
356            }
357    
358            protected void registerDestinationEventListeners() {
359                    if (_destinationEventListeners.isEmpty()) {
360                            return;
361                    }
362    
363                    Registry registry = RegistryUtil.getRegistry();
364    
365                    _destinationEventListenerServiceRegistrar =
366                            registry.getServiceRegistrar(DestinationEventListener.class);
367    
368                    for (final Map.Entry<String, List<DestinationEventListener>> entry :
369                                    _destinationEventListeners.entrySet()) {
370    
371                            final String destinationName = entry.getKey();
372    
373                            ServiceDependencyManager serviceDependencyManager =
374                                    new ServiceDependencyManager();
375    
376                            serviceDependencyManager.addServiceDependencyListener(
377                                    new ServiceDependencyListener() {
378    
379                                            @Override
380                                            public void dependenciesFulfilled() {
381                                                    Map<String, Object> properties = new HashMap<>();
382    
383                                                    properties.put("destination.name", destinationName);
384    
385                                                    for (DestinationEventListener destinationEventListener :
386                                                                    entry.getValue()) {
387    
388                                                            _destinationEventListenerServiceRegistrar.
389                                                                    registerService(
390                                                                            DestinationEventListener.class,
391                                                                            destinationEventListener, properties);
392                                                    }
393                                            }
394    
395                                            @Override
396                                            public void destroy() {
397                                            }
398    
399                                    });
400    
401                            Filter filter = registry.getFilter(
402                                    "(&(destination.name=" + destinationName + ")(objectClass=" +
403                                            Destination.class.getName() + "))");
404    
405                            serviceDependencyManager.registerDependencies(filter);
406                    }
407            }
408    
409            protected void registerDestinations() {
410                    if (_destinations.isEmpty()) {
411                            return;
412                    }
413    
414                    Registry registry = RegistryUtil.getRegistry();
415    
416                    _destinationServiceRegistrar = registry.getServiceRegistrar(
417                            Destination.class);
418    
419                    for (Destination destination : _destinations) {
420                            String destinationName = destination.getName();
421    
422                            try {
423                                    PortalMessageBusPermission.checkListen(destinationName);
424                            }
425                            catch (SecurityException se) {
426                                    if (_log.isInfoEnabled()) {
427                                            _log.info("Rejecting destination " + destinationName);
428                                    }
429    
430                                    continue;
431                            }
432    
433                            Map<String, Object> properties = new HashMap<>();
434    
435                            properties.put("destination.name", destinationName);
436    
437                            _destinationServiceRegistrar.registerService(
438                                    Destination.class, destination, properties);
439                    }
440            }
441    
442            protected void registerMessageBusEventListeners() {
443                    if (_messageBusEventListeners.isEmpty()) {
444                            return;
445                    }
446    
447                    Registry registry = RegistryUtil.getRegistry();
448    
449                    _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
450                            MessageBusEventListener.class);
451    
452                    for (MessageBusEventListener messageBusEventListener :
453                                    _messageBusEventListeners) {
454    
455                            _messageBusEventListenerServiceRegistrar.registerService(
456                                    MessageBusEventListener.class, messageBusEventListener);
457                    }
458            }
459    
460            private static final Log _log = LogFactoryUtil.getLog(
461                    AbstractMessagingConfigurator.class);
462    
463            private final Set<DestinationConfiguration> _destinationConfigurations =
464                    new HashSet<>();
465            private ServiceRegistrar<DestinationConfiguration>
466                    _destinationConfigurationServiceRegistrar;
467            private final Map<String, List<DestinationEventListener>>
468                    _destinationEventListeners = new HashMap<>();
469            private ServiceRegistrar<DestinationEventListener>
470                    _destinationEventListenerServiceRegistrar;
471            private final List<Destination> _destinations = new ArrayList<>();
472            private ServiceRegistrar<Destination> _destinationServiceRegistrar;
473            private volatile MessageBus _messageBus;
474            private final List<MessageBusEventListener> _messageBusEventListeners =
475                    new ArrayList<>();
476            private ServiceRegistrar<MessageBusEventListener>
477                    _messageBusEventListenerServiceRegistrar;
478            private final Map<String, List<MessageListener>> _messageListeners =
479                    new HashMap<>();
480            private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
481            private boolean _portalMessagingConfigurator;
482    
483            private class DestinationServiceDependencyListener
484                    implements ServiceDependencyListener {
485    
486                    public DestinationServiceDependencyListener(
487                            String destinationName, List<MessageListener> messageListeners) {
488    
489                            _destinationName = destinationName;
490                            _messageListeners = messageListeners;
491                    }
492    
493                    @Override
494                    public void dependenciesFulfilled() {
495                            ClassLoader operatingClassLoader = getOperatingClassloader();
496    
497                            if (SPIUtil.isSPI()) {
498                                    SPI spi = SPIUtil.getSPI();
499    
500                                    try {
501                                            RegistrationReference registrationReference =
502                                                    spi.getRegistrationReference();
503    
504                                            IntrabandRPCUtil.execute(
505                                                    registrationReference,
506                                                    new DestinationConfigurationProcessCallable(
507                                                            _destinationName));
508                                    }
509                                    catch (Exception e) {
510                                            StringBundler sb = new StringBundler(4);
511    
512                                            sb.append("Unable to install ");
513                                            sb.append(
514                                                    DestinationConfigurationProcessCallable.class.
515                                                            getName());
516                                            sb.append(" on MPI for ");
517                                            sb.append(_destinationName);
518    
519                                            _log.error(sb.toString(), e);
520                                    }
521                            }
522    
523                            Map<String, Object> properties = new HashMap<>();
524    
525                            properties.put("destination.name", _destinationName);
526                            properties.put(
527                                    "message.listener.operating.class.loader",
528                                    operatingClassLoader);
529    
530                            for (MessageListener messageListener : _messageListeners) {
531                                    _messageListenerServiceRegistrar.registerService(
532                                            MessageListener.class, messageListener, properties);
533                            }
534                    }
535    
536                    @Override
537                    public void destroy() {
538                    }
539    
540                    private final String _destinationName;
541                    private final List<MessageListener> _messageListeners;
542    
543            }
544    
545    }