001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.config;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.Destination;
020    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
021    import com.liferay.portal.kernel.messaging.DestinationEventListener;
022    import com.liferay.portal.kernel.messaging.DestinationFactory;
023    import com.liferay.portal.kernel.messaging.MessageBus;
024    import com.liferay.portal.kernel.messaging.MessageBusEventListener;
025    import com.liferay.portal.kernel.messaging.MessageListener;
026    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
027    import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
028    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
029    import com.liferay.portal.kernel.resiliency.spi.SPI;
030    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
032    import com.liferay.portal.kernel.util.ClassLoaderPool;
033    import com.liferay.portal.kernel.util.StringBundler;
034    import com.liferay.registry.Filter;
035    import com.liferay.registry.Registry;
036    import com.liferay.registry.RegistryUtil;
037    import com.liferay.registry.ServiceFinalizer;
038    import com.liferay.registry.ServiceReference;
039    import com.liferay.registry.ServiceRegistrar;
040    import com.liferay.registry.dependency.ServiceDependencyListener;
041    import com.liferay.registry.dependency.ServiceDependencyManager;
042    
043    import java.lang.reflect.Method;
044    
045    import java.util.ArrayList;
046    import java.util.HashMap;
047    import java.util.HashSet;
048    import java.util.List;
049    import java.util.Map;
050    import java.util.Set;
051    
052    /**
053     * @author Michael C. Han
054     */
055    public abstract class AbstractMessagingConfigurator
056            implements MessagingConfigurator {
057    
058            public void afterPropertiesSet() {
059                    final ServiceDependencyManager serviceDependencyManager =
060                            new ServiceDependencyManager();
061    
062                    serviceDependencyManager.addServiceDependencyListener(
063                            new ServiceDependencyListener() {
064    
065                                    @Override
066                                    public void dependenciesFulfilled() {
067                                            Registry registry = RegistryUtil.getRegistry();
068    
069                                            _messageBus = registry.getService(MessageBus.class);
070    
071                                            initialize();
072                                    }
073    
074                                    @Override
075                                    public void destroy() {
076                                    }
077    
078                            });
079    
080                    serviceDependencyManager.registerDependencies(
081                            DestinationFactory.class, MessageBus.class);
082            }
083    
084            @Override
085            public void connect() {
086                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
087                            return;
088                    }
089    
090                    Registry registry = RegistryUtil.getRegistry();
091    
092                    _messageListenerServiceRegistrar = registry.getServiceRegistrar(
093                            MessageListener.class);
094    
095                    Thread currentThread = Thread.currentThread();
096    
097                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
098    
099                    try {
100                            ClassLoader operatingClassLoader = getOperatingClassloader();
101    
102                            currentThread.setContextClassLoader(operatingClassLoader);
103    
104                            for (Map.Entry<String, List<MessageListener>> messageListeners :
105                                            _messageListeners.entrySet()) {
106    
107                                    String destinationName = messageListeners.getKey();
108    
109                                    ServiceDependencyManager serviceDependencyManager =
110                                            new ServiceDependencyManager();
111    
112                                    serviceDependencyManager.addServiceDependencyListener(
113                                            new DestinationServiceDependencyListener(
114                                                    destinationName, messageListeners.getValue()));
115    
116                                    Filter filter = registry.getFilter(
117                                            "(&(destination.name=" + destinationName +
118                                                    ")(objectClass=" + Destination.class.getName() + "))");
119    
120                                    serviceDependencyManager.registerDependencies(filter);
121                            }
122                    }
123                    finally {
124                            currentThread.setContextClassLoader(contextClassLoader);
125                    }
126            }
127    
128            @Override
129            public void destroy() {
130                    disconnect();
131    
132                    _messageBusEventListeners.clear();
133    
134                    if (_messageBusEventListenerServiceRegistrar != null) {
135                            _messageBusEventListenerServiceRegistrar.destroy();
136                    }
137    
138                    _destinationConfigurations.clear();
139    
140                    if (_destinationConfigurationServiceRegistrar != null) {
141                            _destinationConfigurationServiceRegistrar.destroy();
142                    }
143    
144                    _destinations.clear();
145    
146                    if (_destinationServiceRegistrar != null) {
147                            _destinationServiceRegistrar.destroy(
148                                    new ServiceFinalizer<Destination>() {
149    
150                                            @Override
151                                            public void finalize(
152                                                    ServiceReference<Destination> serviceReference,
153                                                    Destination destination) {
154    
155                                                    destination.close();
156    
157                                                    destination.removeDestinationEventListeners();
158    
159                                                    destination.unregisterMessageListeners();
160                                            }
161    
162                                    });
163                    }
164    
165                    _messageListeners.clear();
166    
167                    if (_messageListenerServiceRegistrar != null) {
168                            _messageListenerServiceRegistrar.destroy();
169                    }
170    
171                    _destinationEventListeners.clear();
172    
173                    if (_destinationEventListenerServiceRegistrar != null) {
174                            _destinationEventListenerServiceRegistrar.destroy();
175                    }
176    
177                    ClassLoader operatingClassLoader = getOperatingClassloader();
178    
179                    String servletContextName = ClassLoaderPool.getContextName(
180                            operatingClassLoader);
181    
182                    MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
183                            servletContextName, this);
184            }
185    
186            @Override
187            public void disconnect() {
188                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
189                            return;
190                    }
191    
192                    for (Map.Entry<String, List<MessageListener>> messageListeners :
193                                    _messageListeners.entrySet()) {
194    
195                            String destinationName = messageListeners.getKey();
196    
197                            for (MessageListener messageListener :
198                                            messageListeners.getValue()) {
199    
200                                    _messageBus.unregisterMessageListener(
201                                            destinationName, messageListener);
202                            }
203                    }
204            }
205    
206            @Override
207            public void setDestinationConfigurations(
208                    Set<DestinationConfiguration> destinationConfigurations) {
209    
210                    _destinationConfigurations.addAll(destinationConfigurations);
211            }
212    
213            @Override
214            public void setDestinationEventListeners(
215                    Map<String, List<DestinationEventListener>> destinationEventListeners) {
216    
217                    _destinationEventListeners.putAll(destinationEventListeners);
218            }
219    
220            @Override
221            public void setDestinations(List<Destination> destinations) {
222                    _destinations.addAll(destinations);
223            }
224    
225            @Override
226            public void setMessageBusEventListeners(
227                    List<MessageBusEventListener> messageBusEventListeners) {
228    
229                    _messageBusEventListeners.addAll(messageBusEventListeners);
230            }
231    
232            @Override
233            public void setMessageListeners(
234                    Map<String, List<MessageListener>> messageListeners) {
235    
236                    for (List<MessageListener> messageListenersList :
237                                    messageListeners.values()) {
238    
239                            for (MessageListener messageListener : messageListenersList) {
240                                    Class<?> messageListenerClass = messageListener.getClass();
241    
242                                    try {
243                                            Method setMessageBusMethod = messageListenerClass.getMethod(
244                                                    "setMessageBus", MessageBus.class);
245    
246                                            setMessageBusMethod.setAccessible(true);
247    
248                                            setMessageBusMethod.invoke(messageListener, _messageBus);
249    
250                                            continue;
251                                    }
252                                    catch (Exception e) {
253                                    }
254    
255                                    try {
256                                            Method setMessageBusMethod =
257                                                    messageListenerClass.getDeclaredMethod(
258                                                            "setMessageBus", MessageBus.class);
259    
260                                            setMessageBusMethod.setAccessible(true);
261    
262                                            setMessageBusMethod.invoke(messageListener, _messageBus);
263                                    }
264                                    catch (Exception e) {
265                                    }
266                            }
267                    }
268    
269                    _messageListeners.putAll(messageListeners);
270            }
271    
272            /**
273             * @deprecated As of 7.0.0, replaced by {@link #setDestinations(List)}
274             *
275             * @param replacementDestinations
276             */
277            @Deprecated
278            @Override
279            public void setReplacementDestinations(
280                    List<Destination> replacementDestinations) {
281    
282                    _destinations.addAll(replacementDestinations);
283            }
284    
285            protected abstract ClassLoader getOperatingClassloader();
286    
287            protected void initialize() {
288                    Thread currentThread = Thread.currentThread();
289    
290                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
291    
292                    ClassLoader operatingClassLoader = getOperatingClassloader();
293    
294                    if (contextClassLoader == operatingClassLoader) {
295                            _portalMessagingConfigurator = true;
296                    }
297    
298                    registerMessageBusEventListeners();
299    
300                    registerDestinationConfigurations();
301    
302                    registerDestinations();
303    
304                    registerDestinationEventListeners();
305    
306                    connect();
307    
308                    String servletContextName = ClassLoaderPool.getContextName(
309                            operatingClassLoader);
310    
311                    MessagingConfiguratorRegistry.registerMessagingConfigurator(
312                            servletContextName, this);
313            }
314    
315            protected void registerDestinationConfigurations() {
316                    if (_destinationConfigurations.isEmpty()) {
317                            return;
318                    }
319    
320                    Registry registry = RegistryUtil.getRegistry();
321    
322                    _destinationConfigurationServiceRegistrar =
323                            registry.getServiceRegistrar(DestinationConfiguration.class);
324    
325                    for (DestinationConfiguration destinationConfiguration :
326                                    _destinationConfigurations) {
327    
328                            try {
329                                    PortalMessageBusPermission.checkListen(
330                                            destinationConfiguration.getDestinationName());
331                            }
332                            catch (SecurityException se) {
333                                    if (_log.isInfoEnabled()) {
334                                            _log.info(
335                                                    "Rejecting destination " +
336                                                            destinationConfiguration.getDestinationName());
337                                    }
338    
339                                    continue;
340                            }
341    
342                            Map<String, Object> properties = new HashMap<>();
343    
344                            properties.put(
345                                    "destination.name",
346                                    destinationConfiguration.getDestinationName());
347    
348                            _destinationConfigurationServiceRegistrar.registerService(
349                                    DestinationConfiguration.class, destinationConfiguration,
350                                    properties);
351                    }
352            }
353    
354            protected void registerDestinationEventListeners() {
355                    if (_destinationEventListeners.isEmpty()) {
356                            return;
357                    }
358    
359                    Registry registry = RegistryUtil.getRegistry();
360    
361                    _destinationEventListenerServiceRegistrar =
362                            registry.getServiceRegistrar(DestinationEventListener.class);
363    
364                    for (final Map.Entry<String, List<DestinationEventListener>> entry :
365                                    _destinationEventListeners.entrySet()) {
366    
367                            final String destinationName = entry.getKey();
368    
369                            ServiceDependencyManager serviceDependencyManager =
370                                    new ServiceDependencyManager();
371    
372                            serviceDependencyManager.addServiceDependencyListener(
373                                    new ServiceDependencyListener() {
374    
375                                            @Override
376                                            public void dependenciesFulfilled() {
377                                                    Map<String, Object> properties = new HashMap<>();
378    
379                                                    properties.put("destination.name", destinationName);
380    
381                                                    for (DestinationEventListener destinationEventListener :
382                                                                    entry.getValue()) {
383    
384                                                            _destinationEventListenerServiceRegistrar.
385                                                                    registerService(
386                                                                            DestinationEventListener.class,
387                                                                            destinationEventListener, properties);
388                                                    }
389                                            }
390    
391                                            @Override
392                                            public void destroy() {
393                                            }
394    
395                                    });
396    
397                            Filter filter = registry.getFilter(
398                                    "(&(destination.name=" + destinationName + ")(objectClass=" +
399                                            Destination.class.getName() + "))");
400    
401                            serviceDependencyManager.registerDependencies(filter);
402                    }
403            }
404    
405            protected void registerDestinations() {
406                    if (_destinations.isEmpty()) {
407                            return;
408                    }
409    
410                    Registry registry = RegistryUtil.getRegistry();
411    
412                    _destinationServiceRegistrar = registry.getServiceRegistrar(
413                            Destination.class);
414    
415                    for (Destination destination : _destinations) {
416                            String destinationName = destination.getName();
417    
418                            try {
419                                    PortalMessageBusPermission.checkListen(destinationName);
420                            }
421                            catch (SecurityException se) {
422                                    if (_log.isInfoEnabled()) {
423                                            _log.info("Rejecting destination " + destinationName);
424                                    }
425    
426                                    continue;
427                            }
428    
429                            Map<String, Object> properties = new HashMap<>();
430    
431                            properties.put("destination.name", destinationName);
432    
433                            _destinationServiceRegistrar.registerService(
434                                    Destination.class, destination, properties);
435                    }
436            }
437    
438            protected void registerMessageBusEventListeners() {
439                    if (_messageBusEventListeners.isEmpty()) {
440                            return;
441                    }
442    
443                    Registry registry = RegistryUtil.getRegistry();
444    
445                    _messageBusEventListenerServiceRegistrar = registry.getServiceRegistrar(
446                            MessageBusEventListener.class);
447    
448                    for (MessageBusEventListener messageBusEventListener :
449                                    _messageBusEventListeners) {
450    
451                            _messageBusEventListenerServiceRegistrar.registerService(
452                                    MessageBusEventListener.class, messageBusEventListener);
453                    }
454            }
455    
456            private static final Log _log = LogFactoryUtil.getLog(
457                    AbstractMessagingConfigurator.class);
458    
459            private final Set<DestinationConfiguration> _destinationConfigurations =
460                    new HashSet<>();
461            private ServiceRegistrar<DestinationConfiguration>
462                    _destinationConfigurationServiceRegistrar;
463            private final Map<String, List<DestinationEventListener>>
464                    _destinationEventListeners = new HashMap<>();
465            private ServiceRegistrar<DestinationEventListener>
466                    _destinationEventListenerServiceRegistrar;
467            private final List<Destination> _destinations = new ArrayList<>();
468            private ServiceRegistrar<Destination> _destinationServiceRegistrar;
469            private volatile MessageBus _messageBus;
470            private final List<MessageBusEventListener> _messageBusEventListeners =
471                    new ArrayList<>();
472            private ServiceRegistrar<MessageBusEventListener>
473                    _messageBusEventListenerServiceRegistrar;
474            private final Map<String, List<MessageListener>> _messageListeners =
475                    new HashMap<>();
476            private ServiceRegistrar<MessageListener> _messageListenerServiceRegistrar;
477            private boolean _portalMessagingConfigurator;
478    
479            private class DestinationServiceDependencyListener
480                    implements ServiceDependencyListener {
481    
482                    public DestinationServiceDependencyListener(
483                            String destinationName, List<MessageListener> messageListeners) {
484    
485                            _destinationName = destinationName;
486                            _messageListeners = messageListeners;
487                    }
488    
489                    @Override
490                    public void dependenciesFulfilled() {
491                            ClassLoader operatingClassLoader = getOperatingClassloader();
492    
493                            if (SPIUtil.isSPI()) {
494                                    SPI spi = SPIUtil.getSPI();
495    
496                                    try {
497                                            RegistrationReference registrationReference =
498                                                    spi.getRegistrationReference();
499    
500                                            IntrabandRPCUtil.execute(
501                                                    registrationReference,
502                                                    new DestinationConfigurationProcessCallable(
503                                                            _destinationName));
504                                    }
505                                    catch (Exception e) {
506                                            StringBundler sb = new StringBundler(4);
507    
508                                            sb.append("Unable to install ");
509                                            sb.append(
510                                                    DestinationConfigurationProcessCallable.class.
511                                                            getName());
512                                            sb.append(" on MPI for ");
513                                            sb.append(_destinationName);
514    
515                                            _log.error(sb.toString(), e);
516                                    }
517                            }
518    
519                            Map<String, Object> properties = new HashMap<>();
520    
521                            properties.put("destination.name", _destinationName);
522                            properties.put(
523                                    "message.listener.operating.class.loader",
524                                    operatingClassLoader);
525    
526                            for (MessageListener messageListener : _messageListeners) {
527                                    _messageListenerServiceRegistrar.registerService(
528                                            MessageListener.class, messageListener, properties);
529                            }
530                    }
531    
532                    @Override
533                    public void destroy() {
534                    }
535    
536                    private final String _destinationName;
537                    private final List<MessageListener> _messageListeners;
538    
539            }
540    
541    }