001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.config;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.Destination;
020    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021    import com.liferay.portal.kernel.messaging.DestinationEventListener;
022    import com.liferay.portal.kernel.messaging.DestinationFactory;
023    import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
024    import com.liferay.portal.kernel.messaging.MessageBus;
025    import com.liferay.portal.kernel.messaging.MessageBusEventListener;
026    import com.liferay.portal.kernel.messaging.MessageListener;
027    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
028    import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
029    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
030    import com.liferay.portal.kernel.resiliency.spi.SPI;
031    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
032    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
033    import com.liferay.portal.kernel.util.ClassLoaderPool;
034    import com.liferay.portal.kernel.util.StringBundler;
035    import com.liferay.portal.kernel.util.StringPool;
036    import com.liferay.registry.Filter;
037    import com.liferay.registry.Registry;
038    import com.liferay.registry.RegistryUtil;
039    import com.liferay.registry.ServiceFinalizer;
040    import com.liferay.registry.ServiceReference;
041    import com.liferay.registry.ServiceRegistrar;
042    import com.liferay.registry.dependency.ServiceDependencyListener;
043    import com.liferay.registry.dependency.ServiceDependencyManager;
044    
045    import java.lang.reflect.Method;
046    
047    import java.util.ArrayList;
048    import java.util.HashMap;
049    import java.util.HashSet;
050    import java.util.List;
051    import java.util.Map;
052    import java.util.Set;
053    
054    /**
055     * @author Michael C. Han
056     */
057    public abstract class AbstractMessagingConfigurator
058            implements MessagingConfigurator {
059    
060            public void afterPropertiesSet() {
061                    final ServiceDependencyManager serviceDependencyManager =
062                            new ServiceDependencyManager();
063    
064                    serviceDependencyManager.addServiceDependencyListener(
065                            new ServiceDependencyListener() {
066    
067                                    @Override
068                                    public void dependenciesFulfilled() {
069                                            Registry registry = RegistryUtil.getRegistry();
070    
071                                            _messageBus = registry.getService(MessageBus.class);
072    
073                                            initialize();
074                                    }
075    
076                                    @Override
077                                    public void destroy() {
078                                    }
079    
080                            });
081    
082                    serviceDependencyManager.registerDependencies(
083                            DestinationFactory.class, MessageBus.class);
084            }
085    
086            @Override
087            public void connect() {
088                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
089                            return;
090                    }
091    
092                    Registry registry = RegistryUtil.getRegistry();
093    
094                    _messageListenerServiceRegistrar = registry.getServiceRegistrar(
095                            MessageListener.class);
096    
097                    Thread currentThread = Thread.currentThread();
098    
099                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
100    
101                    try {
102                            ClassLoader operatingClassLoader = getOperatingClassloader();
103    
104                            currentThread.setContextClassLoader(operatingClassLoader);
105    
106                            for (Map.Entry<String, List<MessageListener>> messageListeners :
107                                            _messageListeners.entrySet()) {
108    
109                                    String destinationName = messageListeners.getKey();
110    
111                                    ServiceDependencyManager serviceDependencyManager =
112                                            new ServiceDependencyManager();
113    
114                                    serviceDependencyManager.addServiceDependencyListener(
115                                            new DestinationServiceDependencyListener(
116                                                    destinationName, messageListeners.getValue()));
117    
118                                    Filter filter = registry.getFilter(
119                                            "(&(destination.name=" + destinationName +
120                                                    ")(objectClass=" + Destination.class.getName() + "))");
121    
122                                    serviceDependencyManager.registerDependencies(filter);
123                            }
124                    }
125                    finally {
126                            currentThread.setContextClassLoader(contextClassLoader);
127                    }
128            }
129    
130            @Override
131            public void destroy() {
132                    disconnect();
133    
134                    _messageBusEventListeners.clear();
135    
136                    if (_messageBusEventListenerServiceRegistrar != null) {
137                            _messageBusEventListenerServiceRegistrar.destroy();
138                    }
139    
140                    _destinationConfigurations.clear();
141    
142                    _destinations.clear();
143    
144                    if (_destinationServiceRegistrar != null) {
145                            _destinationServiceRegistrar.destroy(
146                                    new ServiceFinalizer<Destination>() {
147    
148                                            @Override
149                                            public void finalize(
150                                                    ServiceReference<Destination> serviceReference,
151                                                    Destination destination) {
152    
153                                                    destination.close();
154    
155                                                    destination.removeDestinationEventListeners();
156    
157                                                    destination.unregisterMessageListeners();
158                                            }
159    
160                                    });
161                    }
162    
163                    _messageListeners.clear();
164    
165                    if (_messageListenerServiceRegistrar != null) {
166                            _messageListenerServiceRegistrar.destroy();
167                    }
168    
169                    _destinationEventListeners.clear();
170    
171                    if (_destinationEventListenerServiceRegistrar != null) {
172                            _destinationEventListenerServiceRegistrar.destroy();
173                    }
174    
175                    ClassLoader operatingClassLoader = getOperatingClassloader();
176    
177                    String servletContextName = ClassLoaderPool.getContextName(
178                            operatingClassLoader);
179    
180                    MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
181                            servletContextName, this);
182            }
183    
184            @Override
185            public void disconnect() {
186                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
187                            return;
188                    }
189    
190                    for (Map.Entry<String, List<MessageListener>> messageListeners :
191                                    _messageListeners.entrySet()) {
192    
193                            String destinationName = messageListeners.getKey();
194    
195                            for (MessageListener messageListener :
196                                            messageListeners.getValue()) {
197    
198                                    _messageBus.unregisterMessageListener(
199                                            destinationName, messageListener);
200                            }
201                    }
202            }
203    
204            @Override
205            public void setDestinationConfigurations(
206                    Set<DestinationConfiguration> destinationConfigurations) {
207    
208                    _destinationConfigurations.addAll(destinationConfigurations);
209            }
210    
211            @Override
212            public void setDestinationEventListeners(
213                    Map<String, List<DestinationEventListener>> destinationEventListeners) {
214    
215                    _destinationEventListeners.putAll(destinationEventListeners);
216            }
217    
218            @Override
219            public void setDestinations(List<Destination> destinations) {
220                    _destinations.addAll(destinations);
221            }
222    
223            @Override
224            public void setMessageBusEventListeners(
225                    List<MessageBusEventListener> messageBusEventListeners) {
226    
227                    _messageBusEventListeners.addAll(messageBusEventListeners);
228            }
229    
230            @Override
231            public void setMessageListeners(
232                    Map<String, List<MessageListener>> messageListeners) {
233    
234                    for (List<MessageListener> messageListenersList :
235                                    messageListeners.values()) {
236    
237                            for (MessageListener messageListener : messageListenersList) {
238                                    Class<?> messageListenerClass = messageListener.getClass();
239    
240                                    try {
241                                            Method setMessageBusMethod = messageListenerClass.getMethod(
242                                                    "setMessageBus", MessageBus.class);
243    
244                                            setMessageBusMethod.setAccessible(true);
245    
246                                            setMessageBusMethod.invoke(messageListener, _messageBus);
247    
248                                            continue;
249                                    }
250                                    catch (Exception e) {
251                                    }
252    
253                                    try {
254                                            Method setMessageBusMethod =
255                                                    messageListenerClass.getDeclaredMethod(
256                                                            "setMessageBus", MessageBus.class);
257    
258                                            setMessageBusMethod.setAccessible(true);
259    
260                                            setMessageBusMethod.invoke(messageListener, _messageBus);
261                                    }
262                                    catch (Exception e) {
263                                    }
264                            }
265                    }
266    
267                    _messageListeners.putAll(messageListeners);
268            }
269    
270            /**
271             * @param      replacementDestinations
272             * @deprecated As of 7.0.0, replaced by {@link #setDestinations(List)}
273             */
274            @Deprecated
275            @Override
276            public void setReplacementDestinations(
277                    List<Destination> replacementDestinations) {
278    
279                    _destinations.addAll(replacementDestinations);
280            }
281    
282            protected abstract ClassLoader getOperatingClassloader();
283    
284            protected void initialize() {
285                    Thread currentThread = Thread.currentThread();
286    
287                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
288    
289                    ClassLoader operatingClassLoader = getOperatingClassloader();
290    
291                    if (contextClassLoader == operatingClassLoader) {
292                            _portalMessagingConfigurator = true;
293                    }
294    
295                    registerMessageBusEventListeners();
296    
297                    registerDestinations();
298    
299                    registerDestinationEventListeners();
300    
301                    connect();
302    
303                    String servletContextName = ClassLoaderPool.getContextName(
304                            operatingClassLoader);
305    
306                    if ((servletContextName != null) &&
307                            !servletContextName.equals(StringPool.NULL)) {
308    
309                            MessagingConfiguratorRegistry.registerMessagingConfigurator(
310                                    servletContextName, this);
311                    }
312            }
313    
314            protected void registerDestinationEventListeners() {
315                    if (_destinationEventListeners.isEmpty()) {
316                            return;
317                    }
318    
319                    Registry registry = RegistryUtil.getRegistry();
320    
321                    _destinationEventListenerServiceRegistrar =
322                            registry.getServiceRegistrar(DestinationEventListener.class);
323    
324                    for (final Map.Entry<String, List<DestinationEventListener>> entry :
325                                    _destinationEventListeners.entrySet()) {
326    
327                            final String destinationName = entry.getKey();
328    
329                            ServiceDependencyManager serviceDependencyManager =
330                                    new ServiceDependencyManager();
331    
332                            serviceDependencyManager.addServiceDependencyListener(
333                                    new ServiceDependencyListener() {
334    
335                                            @Override
336                                            public void dependenciesFulfilled() {
337                                                    Map<String, Object> properties = new HashMap<>();
338    
339                                                    properties.put("destination.name", destinationName);
340    
341                                                    for (DestinationEventListener destinationEventListener :
342                                                                    entry.getValue()) {
343    
344                                                            _destinationEventListenerServiceRegistrar.
345                                                                    registerService(
346                                                                            DestinationEventListener.class,
347                                                                            destinationEventListener, properties);
348                                                    }
349                                            }
350    
351                                            @Override
352                                            public void destroy() {
353                                            }
354    
355                                    });
356    
357                            Filter filter = registry.getFilter(
358                                    "(&(destination.name=" + destinationName + ")(objectClass=" +
359                                            Destination.class.getName() + "))");
360    
361                            serviceDependencyManager.registerDependencies(filter);
362                    }
363            }
364    
365            protected void registerDestinations() {
366                    for (DestinationConfiguration destinationConfiguration :
367                                    _destinationConfigurations) {
368    
369                            try {
370                                    PortalMessageBusPermission.checkListen(
371                                            destinationConfiguration.getDestinationName());
372                            }
373                            catch (SecurityException se) {
374                                    if (_log.isInfoEnabled()) {
375                                            _log.info(
376                                                    "Rejecting destination " +
377                                                            destinationConfiguration.getDestinationName());
378                                    }
379    
380                                    continue;
381                            }
382    
383                            _destinations.add(
384                                    DestinationFactoryUtil.createDestination(
385                                            destinationConfiguration));
386                    }
387    
388                    if (_destinations.isEmpty()) {
389                            return;
390                    }
391    
392                    Registry registry = RegistryUtil.getRegistry();
393    
394                    _destinationServiceRegistrar = registry.getServiceRegistrar(
395                            Destination.class);
396    
397                    for (Destination destination : _destinations) {
398                            String destinationName = destination.getName();
399    
400                            try {
401                                    PortalMessageBusPermission.checkListen(destinationName);
402                            }
403                            catch (SecurityException se) {
404                                    if (_log.isInfoEnabled()) {
405                                            _log.info("Rejecting destination " + destinationName);
406                                    }
407    
408                                    continue;
409                            }
410    
411                            Map<String, Object> properties = new HashMap<>();
412    
413                            properties.put("destination.name", destinationName);
414    
415                            _destinationServiceRegistrar.registerService(
416                                    Destination.class, destination, properties);
417                    }
418            }
419    
420            protected void registerMessageBusEventListeners() {
421                    if (_messageBusEventListeners.isEmpty()) {
422                            return;
423                    }
424    
425                    Registry registry = RegistryUtil.getRegistry();
426    
427                    _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
428                            MessageBusEventListener.class);
429    
430                    for (MessageBusEventListener messageBusEventListener :
431                                    _messageBusEventListeners) {
432    
433                            _messageBusEventListenerServiceRegistrar.registerService(
434                                    MessageBusEventListener.class, messageBusEventListener);
435                    }
436            }
437    
438            private static final Log _log = LogFactoryUtil.getLog(
439                    AbstractMessagingConfigurator.class);
440    
441            private final Set<DestinationConfiguration> _destinationConfigurations =
442                    new HashSet<>();
443            private final Map<String, List<DestinationEventListener>>
444                    _destinationEventListeners = new HashMap<>();
445            private ServiceRegistrar<DestinationEventListener>
446                    _destinationEventListenerServiceRegistrar;
447            private final List<Destination> _destinations = new ArrayList<>();
448            private ServiceRegistrar<Destination> _destinationServiceRegistrar;
449            private volatile MessageBus _messageBus;
450            private final List<MessageBusEventListener> _messageBusEventListeners =
451                    new ArrayList<>();
452            private ServiceRegistrar<MessageBusEventListener>
453                    _messageBusEventListenerServiceRegistrar;
454            private final Map<String, List<MessageListener>> _messageListeners =
455                    new HashMap<>();
456            private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
457            private boolean _portalMessagingConfigurator;
458    
459            private class DestinationServiceDependencyListener
460                    implements ServiceDependencyListener {
461    
462                    public DestinationServiceDependencyListener(
463                            String destinationName, List<MessageListener> messageListeners) {
464    
465                            _destinationName = destinationName;
466                            _messageListeners = messageListeners;
467                    }
468    
469                    @Override
470                    public void dependenciesFulfilled() {
471                            ClassLoader operatingClassLoader = getOperatingClassloader();
472    
473                            if (SPIUtil.isSPI()) {
474                                    SPI spi = SPIUtil.getSPI();
475    
476                                    try {
477                                            RegistrationReference registrationReference =
478                                                    spi.getRegistrationReference();
479    
480                                            IntrabandRPCUtil.execute(
481                                                    registrationReference,
482                                                    new DestinationConfigurationProcessCallable(
483                                                            _destinationName));
484                                    }
485                                    catch (Exception e) {
486                                            StringBundler sb = new StringBundler(4);
487    
488                                            sb.append("Unable to install ");
489                                            sb.append(
490                                                    DestinationConfigurationProcessCallable.class.
491                                                            getName());
492                                            sb.append(" on MPI for ");
493                                            sb.append(_destinationName);
494    
495                                            _log.error(sb.toString(), e);
496                                    }
497                            }
498    
499                            Map<String, Object> properties = new HashMap<>();
500    
501                            properties.put("destination.name", _destinationName);
502                            properties.put(
503                                    "message.listener.operating.class.loader",
504                                    operatingClassLoader);
505    
506                            for (MessageListener messageListener : _messageListeners) {
507                                    _messageListenerServiceRegistrar.registerService(
508                                            MessageListener.class, messageListener, properties);
509                            }
510                    }
511    
512                    @Override
513                    public void destroy() {
514                    }
515    
516                    private final String _destinationName;
517                    private final List<MessageListener> _messageListeners;
518    
519            }
520    
521    }