001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.config;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.Destination;
020    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021    import com.liferay.portal.kernel.messaging.DestinationEventListener;
022    import com.liferay.portal.kernel.messaging.DestinationFactory;
023    import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
024    import com.liferay.portal.kernel.messaging.MessageBus;
025    import com.liferay.portal.kernel.messaging.MessageBusEventListener;
026    import com.liferay.portal.kernel.messaging.MessageListener;
027    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
028    import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
029    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
030    import com.liferay.portal.kernel.resiliency.spi.SPI;
031    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
032    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
033    import com.liferay.portal.kernel.util.ClassLoaderPool;
034    import com.liferay.portal.kernel.util.StringBundler;
035    import com.liferay.portal.kernel.util.StringPool;
036    import com.liferay.registry.Filter;
037    import com.liferay.registry.Registry;
038    import com.liferay.registry.RegistryUtil;
039    import com.liferay.registry.ServiceRegistrar;
040    import com.liferay.registry.dependency.ServiceDependencyListener;
041    import com.liferay.registry.dependency.ServiceDependencyManager;
042    
043    import java.lang.reflect.Method;
044    
045    import java.util.ArrayList;
046    import java.util.HashMap;
047    import java.util.HashSet;
048    import java.util.List;
049    import java.util.Map;
050    import java.util.Set;
051    
052    /**
053     * @author Michael C. Han
054     */
055    public abstract class AbstractMessagingConfigurator
056            implements MessagingConfigurator {
057    
058            public void afterPropertiesSet() {
059                    final ServiceDependencyManager serviceDependencyManager =
060                            new ServiceDependencyManager();
061    
062                    serviceDependencyManager.addServiceDependencyListener(
063                            new ServiceDependencyListener() {
064    
065                                    @Override
066                                    public void dependenciesFulfilled() {
067                                            Registry registry = RegistryUtil.getRegistry();
068    
069                                            _messageBus = registry.getService(MessageBus.class);
070    
071                                            initialize();
072                                    }
073    
074                                    @Override
075                                    public void destroy() {
076                                    }
077    
078                            });
079    
080                    serviceDependencyManager.registerDependencies(
081                            DestinationFactory.class, MessageBus.class);
082            }
083    
084            @Override
085            public void connect() {
086                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
087                            return;
088                    }
089    
090                    Registry registry = RegistryUtil.getRegistry();
091    
092                    _messageListenerServiceRegistrar = registry.getServiceRegistrar(
093                            MessageListener.class);
094    
095                    Thread currentThread = Thread.currentThread();
096    
097                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
098    
099                    try {
100                            ClassLoader operatingClassLoader = getOperatingClassloader();
101    
102                            currentThread.setContextClassLoader(operatingClassLoader);
103    
104                            for (Map.Entry<String, List<MessageListener>> messageListeners :
105                                            _messageListeners.entrySet()) {
106    
107                                    String destinationName = messageListeners.getKey();
108    
109                                    ServiceDependencyManager serviceDependencyManager =
110                                            new ServiceDependencyManager();
111    
112                                    serviceDependencyManager.addServiceDependencyListener(
113                                            new DestinationServiceDependencyListener(
114                                                    destinationName, messageListeners.getValue()));
115    
116                                    Filter filter = registry.getFilter(
117                                            "(&(destination.name=" + destinationName +
118                                                    ")(objectClass=" + Destination.class.getName() + "))");
119    
120                                    serviceDependencyManager.registerDependencies(filter);
121                            }
122                    }
123                    finally {
124                            currentThread.setContextClassLoader(contextClassLoader);
125                    }
126            }
127    
128            @Override
129            public void destroy() {
130                    if (_messageListenerServiceRegistrar != null) {
131                            _messageListenerServiceRegistrar.destroy();
132                    }
133    
134                    if (_destinationEventListenerServiceRegistrar != null) {
135                            _destinationEventListenerServiceRegistrar.destroy();
136                    }
137    
138                    if (_destinationServiceRegistrar != null) {
139                            _destinationServiceRegistrar.destroy();
140                    }
141    
142                    if (_messageBusEventListenerServiceRegistrar != null) {
143                            _messageBusEventListenerServiceRegistrar.destroy();
144                    }
145    
146                    _destinationConfigurations.clear();
147                    _destinationEventListeners.clear();
148                    _messageListeners.clear();
149    
150                    for (Destination destination : _destinations) {
151                            destination.destroy();
152                    }
153    
154                    _destinations.clear();
155                    _messageBusEventListeners.clear();
156    
157                    ClassLoader operatingClassLoader = getOperatingClassloader();
158    
159                    String servletContextName = ClassLoaderPool.getContextName(
160                            operatingClassLoader);
161    
162                    MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
163                            servletContextName, this);
164            }
165    
166            @Override
167            public void disconnect() {
168                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
169                            return;
170                    }
171    
172                    for (Map.Entry<String, List<MessageListener>> messageListeners :
173                                    _messageListeners.entrySet()) {
174    
175                            String destinationName = messageListeners.getKey();
176    
177                            for (MessageListener messageListener :
178                                            messageListeners.getValue()) {
179    
180                                    _messageBus.unregisterMessageListener(
181                                            destinationName, messageListener);
182                            }
183                    }
184            }
185    
186            @Override
187            public void setDestinationConfigurations(
188                    Set<DestinationConfiguration> destinationConfigurations) {
189    
190                    _destinationConfigurations.addAll(destinationConfigurations);
191            }
192    
193            @Override
194            public void setDestinationEventListeners(
195                    Map<String, List<DestinationEventListener>> destinationEventListeners) {
196    
197                    _destinationEventListeners.putAll(destinationEventListeners);
198            }
199    
200            @Override
201            public void setDestinations(List<Destination> destinations) {
202                    _destinations.addAll(destinations);
203            }
204    
205            @Override
206            public void setMessageBusEventListeners(
207                    List<MessageBusEventListener> messageBusEventListeners) {
208    
209                    _messageBusEventListeners.addAll(messageBusEventListeners);
210            }
211    
212            @Override
213            public void setMessageListeners(
214                    Map<String, List<MessageListener>> messageListeners) {
215    
216                    for (List<MessageListener> messageListenersList :
217                                    messageListeners.values()) {
218    
219                            for (MessageListener messageListener : messageListenersList) {
220                                    Class<?> messageListenerClass = messageListener.getClass();
221    
222                                    try {
223                                            Method setMessageBusMethod = messageListenerClass.getMethod(
224                                                    "setMessageBus", MessageBus.class);
225    
226                                            setMessageBusMethod.setAccessible(true);
227    
228                                            setMessageBusMethod.invoke(messageListener, _messageBus);
229    
230                                            continue;
231                                    }
232                                    catch (Exception e) {
233                                    }
234    
235                                    try {
236                                            Method setMessageBusMethod =
237                                                    messageListenerClass.getDeclaredMethod(
238                                                            "setMessageBus", MessageBus.class);
239    
240                                            setMessageBusMethod.setAccessible(true);
241    
242                                            setMessageBusMethod.invoke(messageListener, _messageBus);
243                                    }
244                                    catch (Exception e) {
245                                    }
246                            }
247                    }
248    
249                    _messageListeners.putAll(messageListeners);
250            }
251    
252            /**
253             * @param      replacementDestinations
254             * @deprecated As of 7.0.0, replaced by {@link #setDestinations(List)}
255             */
256            @Deprecated
257            @Override
258            public void setReplacementDestinations(
259                    List<Destination> replacementDestinations) {
260    
261                    _destinations.addAll(replacementDestinations);
262            }
263    
264            protected abstract ClassLoader getOperatingClassloader();
265    
266            protected void initialize() {
267                    Thread currentThread = Thread.currentThread();
268    
269                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
270    
271                    ClassLoader operatingClassLoader = getOperatingClassloader();
272    
273                    if (contextClassLoader == operatingClassLoader) {
274                            _portalMessagingConfigurator = true;
275                    }
276    
277                    registerMessageBusEventListeners();
278    
279                    registerDestinations();
280    
281                    registerDestinationEventListeners();
282    
283                    connect();
284    
285                    String servletContextName = ClassLoaderPool.getContextName(
286                            operatingClassLoader);
287    
288                    if ((servletContextName != null) &&
289                            !servletContextName.equals(StringPool.NULL)) {
290    
291                            MessagingConfiguratorRegistry.registerMessagingConfigurator(
292                                    servletContextName, this);
293                    }
294            }
295    
296            protected void registerDestinationEventListeners() {
297                    if (_destinationEventListeners.isEmpty()) {
298                            return;
299                    }
300    
301                    Registry registry = RegistryUtil.getRegistry();
302    
303                    _destinationEventListenerServiceRegistrar =
304                            registry.getServiceRegistrar(DestinationEventListener.class);
305    
306                    for (final Map.Entry<String, List<DestinationEventListener>> entry :
307                                    _destinationEventListeners.entrySet()) {
308    
309                            final String destinationName = entry.getKey();
310    
311                            ServiceDependencyManager serviceDependencyManager =
312                                    new ServiceDependencyManager();
313    
314                            serviceDependencyManager.addServiceDependencyListener(
315                                    new ServiceDependencyListener() {
316    
317                                            @Override
318                                            public void dependenciesFulfilled() {
319                                                    Map<String, Object> properties = new HashMap<>();
320    
321                                                    properties.put("destination.name", destinationName);
322    
323                                                    for (DestinationEventListener destinationEventListener :
324                                                                    entry.getValue()) {
325    
326                                                            _destinationEventListenerServiceRegistrar.
327                                                                    registerService(
328                                                                            DestinationEventListener.class,
329                                                                            destinationEventListener, properties);
330                                                    }
331                                            }
332    
333                                            @Override
334                                            public void destroy() {
335                                            }
336    
337                                    });
338    
339                            Filter filter = registry.getFilter(
340                                    "(&(destination.name=" + destinationName + ")(objectClass=" +
341                                            Destination.class.getName() + "))");
342    
343                            serviceDependencyManager.registerDependencies(filter);
344                    }
345            }
346    
347            protected void registerDestinations() {
348                    for (DestinationConfiguration destinationConfiguration :
349                                    _destinationConfigurations) {
350    
351                            try {
352                                    PortalMessageBusPermission.checkListen(
353                                            destinationConfiguration.getDestinationName());
354                            }
355                            catch (SecurityException se) {
356                                    if (_log.isInfoEnabled()) {
357                                            _log.info(
358                                                    "Rejecting destination " +
359                                                            destinationConfiguration.getDestinationName());
360                                    }
361    
362                                    continue;
363                            }
364    
365                            _destinations.add(
366                                    DestinationFactoryUtil.createDestination(
367                                            destinationConfiguration));
368                    }
369    
370                    if (_destinations.isEmpty()) {
371                            return;
372                    }
373    
374                    Registry registry = RegistryUtil.getRegistry();
375    
376                    _destinationServiceRegistrar = registry.getServiceRegistrar(
377                            Destination.class);
378    
379                    for (Destination destination : _destinations) {
380                            String destinationName = destination.getName();
381    
382                            try {
383                                    PortalMessageBusPermission.checkListen(destinationName);
384                            }
385                            catch (SecurityException se) {
386                                    if (_log.isInfoEnabled()) {
387                                            _log.info("Rejecting destination " + destinationName);
388                                    }
389    
390                                    continue;
391                            }
392    
393                            Map<String, Object> properties = new HashMap<>();
394    
395                            properties.put("destination.name", destinationName);
396    
397                            _destinationServiceRegistrar.registerService(
398                                    Destination.class, destination, properties);
399                    }
400            }
401    
402            protected void registerMessageBusEventListeners() {
403                    if (_messageBusEventListeners.isEmpty()) {
404                            return;
405                    }
406    
407                    Registry registry = RegistryUtil.getRegistry();
408    
409                    _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
410                            MessageBusEventListener.class);
411    
412                    for (MessageBusEventListener messageBusEventListener :
413                                    _messageBusEventListeners) {
414    
415                            _messageBusEventListenerServiceRegistrar.registerService(
416                                    MessageBusEventListener.class, messageBusEventListener);
417                    }
418            }
419    
420            private static final Log _log = LogFactoryUtil.getLog(
421                    AbstractMessagingConfigurator.class);
422    
423            private final Set<DestinationConfiguration> _destinationConfigurations =
424                    new HashSet<>();
425            private final Map<String, List<DestinationEventListener>>
426                    _destinationEventListeners = new HashMap<>();
427            private ServiceRegistrar<DestinationEventListener>
428                    _destinationEventListenerServiceRegistrar;
429            private final List<Destination> _destinations = new ArrayList<>();
430            private ServiceRegistrar<Destination> _destinationServiceRegistrar;
431            private volatile MessageBus _messageBus;
432            private final List<MessageBusEventListener> _messageBusEventListeners =
433                    new ArrayList<>();
434            private ServiceRegistrar<MessageBusEventListener>
435                    _messageBusEventListenerServiceRegistrar;
436            private final Map<String, List<MessageListener>> _messageListeners =
437                    new HashMap<>();
438            private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
439            private boolean _portalMessagingConfigurator;
440    
441            private class DestinationServiceDependencyListener
442                    implements ServiceDependencyListener {
443    
444                    public DestinationServiceDependencyListener(
445                            String destinationName, List<MessageListener> messageListeners) {
446    
447                            _destinationName = destinationName;
448                            _messageListeners = messageListeners;
449                    }
450    
451                    @Override
452                    public void dependenciesFulfilled() {
453                            ClassLoader operatingClassLoader = getOperatingClassloader();
454    
455                            if (SPIUtil.isSPI()) {
456                                    SPI spi = SPIUtil.getSPI();
457    
458                                    try {
459                                            RegistrationReference registrationReference =
460                                                    spi.getRegistrationReference();
461    
462                                            IntrabandRPCUtil.execute(
463                                                    registrationReference,
464                                                    new DestinationConfigurationProcessCallable(
465                                                            _destinationName));
466                                    }
467                                    catch (Exception e) {
468                                            StringBundler sb = new StringBundler(4);
469    
470                                            sb.append("Unable to install ");
471                                            sb.append(
472                                                    DestinationConfigurationProcessCallable.class.
473                                                            getName());
474                                            sb.append(" on MPI for ");
475                                            sb.append(_destinationName);
476    
477                                            _log.error(sb.toString(), e);
478                                    }
479                            }
480    
481                            Map<String, Object> properties = new HashMap<>();
482    
483                            properties.put("destination.name", _destinationName);
484                            properties.put(
485                                    "message.listener.operating.class.loader",
486                                    operatingClassLoader);
487    
488                            for (MessageListener messageListener : _messageListeners) {
489                                    _messageListenerServiceRegistrar.registerService(
490                                            MessageListener.class, messageListener, properties);
491                            }
492                    }
493    
494                    @Override
495                    public void destroy() {
496                    }
497    
498                    private final String _destinationName;
499                    private final List<MessageListener> _messageListeners;
500    
501            }
502    
503    }