001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.resiliency.mpi;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.config.MessagingConfigurator;
020    import com.liferay.portal.kernel.messaging.config.MessagingConfiguratorRegistry;
021    import com.liferay.portal.kernel.nio.intraband.Intraband;
022    import com.liferay.portal.kernel.nio.intraband.IntrabandFactoryUtil;
023    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
024    import com.liferay.portal.kernel.nio.intraband.rpc.BootstrapRPCDatagramReceiveHandler;
025    import com.liferay.portal.kernel.resiliency.spi.SPI;
026    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
027    import com.liferay.portal.kernel.resiliency.spi.SPIRegistryUtil;
028    import com.liferay.portal.kernel.resiliency.spi.provider.SPIProvider;
029    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
030    import com.liferay.portal.kernel.util.PropsKeys;
031    import com.liferay.portal.kernel.util.PropsUtil;
032    
033    import java.rmi.NoSuchObjectException;
034    import java.rmi.RemoteException;
035    import java.rmi.server.UnicastRemoteObject;
036    
037    import java.util.ArrayList;
038    import java.util.Collection;
039    import java.util.Iterator;
040    import java.util.List;
041    import java.util.concurrent.ConcurrentHashMap;
042    import java.util.concurrent.ConcurrentMap;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class MPIHelperUtil {
048    
049            public static SPI checkSPILiveness(SPI spi) {
050                    boolean alive = false;
051    
052                    try {
053                            alive = spi.isAlive();
054                    }
055                    catch (RemoteException re) {
056                            _log.error(re);
057                    }
058    
059                    if (alive) {
060                            return spi;
061                    }
062    
063                    unregisterSPI(spi);
064    
065                    return null;
066            }
067    
068            public static Intraband getIntraband() {
069                    return _intraband;
070            }
071    
072            public static MPI getMPI() {
073                    return _mpi;
074            }
075    
076            public static SPI getSPI(String spiProviderName, String spiId) {
077                    SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
078                            spiProviderName);
079    
080                    if (spiProviderContainer == null) {
081                            return null;
082                    }
083    
084                    SPI spi = spiProviderContainer.getSPI(spiId);
085    
086                    if (spi != null) {
087                            spi = checkSPILiveness(spi);
088                    }
089    
090                    return spi;
091            }
092    
093            public static SPIProvider getSPIProvider(String spiProviderName) {
094                    SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
095                            spiProviderName);
096    
097                    if (spiProviderContainer == null) {
098                            return null;
099                    }
100    
101                    return spiProviderContainer.getSPIProvider();
102            }
103    
104            public static List<SPIProvider> getSPIProviders() {
105                    List<SPIProvider> spiProviders = new ArrayList<>();
106    
107                    for (SPIProviderContainer spiProviderContainer :
108                                    _spiProviderContainers.values()) {
109    
110                            spiProviders.add(spiProviderContainer.getSPIProvider());
111                    }
112    
113                    return spiProviders;
114            }
115    
116            public static List<SPI> getSPIs() {
117                    List<SPI> spis = new ArrayList<>();
118    
119                    for (SPIProviderContainer spiProviderContainer :
120                                    _spiProviderContainers.values()) {
121    
122                            for (SPI spi : spiProviderContainer.getSPIs()) {
123                                    spi = checkSPILiveness(spi);
124    
125                                    if (spi != null) {
126                                            spis.add(spi);
127                                    }
128                            }
129                    }
130    
131                    return spis;
132            }
133    
134            public static List<SPI> getSPIs(String spiProviderName) {
135                    List<SPI> spis = new ArrayList<>();
136    
137                    SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
138                            spiProviderName);
139    
140                    if (spiProviderContainer != null) {
141                            for (SPI spi : spiProviderContainer.getSPIs()) {
142                                    spi = checkSPILiveness(spi);
143    
144                                    if (spi != null) {
145                                            spis.add(spi);
146                                    }
147                            }
148                    }
149    
150                    return spis;
151            }
152    
153            public static boolean registerSPI(SPI spi) {
154                    try {
155                            MPI mpi = spi.getMPI();
156    
157                            if (mpi != _mpi) {
158                                    if (_log.isWarnEnabled()) {
159                                            _log.warn(
160                                                    "Not registering SPI " + spi + " with foreign MPI " +
161                                                            mpi + " versus " + _mpi);
162                                    }
163    
164                                    return false;
165                            }
166    
167                            String spiProviderName = spi.getSPIProviderName();
168    
169                            SPIProviderContainer spiProviderContainer =
170                                    _spiProviderContainers.get(spiProviderName);
171    
172                            if (spiProviderContainer == null) {
173                                    if (_log.isWarnEnabled()) {
174                                            _log.warn(
175                                                    "Not registering SPI " + spi +
176                                                            " with unknown SPI provider " + spiProviderName);
177                                    }
178    
179                                    return false;
180                            }
181    
182                            SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
183    
184                            SPI previousSPI = spiProviderContainer.putSPIIfAbsent(
185                                    spiConfiguration.getSPIId(), spi);
186    
187                            if (previousSPI != null) {
188                                    if (_log.isWarnEnabled()) {
189                                            _log.warn(
190                                                    "Not registering SPI " + spi +
191                                                            " because it duplicates " + previousSPI);
192                                    }
193    
194                                    return false;
195                            }
196    
197                            SPIRegistryUtil.registerSPI(spi);
198    
199                            for (String servletContextName :
200                                            spiConfiguration.getServletContextNames()) {
201    
202                                    List<MessagingConfigurator> messagingConfigurators =
203                                            MessagingConfiguratorRegistry.getMessagingConfigurators(
204                                                    servletContextName);
205    
206                                    if (messagingConfigurators != null) {
207                                            for (MessagingConfigurator messagingConfigurator :
208                                                            messagingConfigurators) {
209    
210                                                    messagingConfigurator.disconnect();
211                                            }
212                                    }
213                            }
214    
215                            if (_log.isInfoEnabled()) {
216                                    _log.info("Registered SPI " + spi);
217                            }
218    
219                            return true;
220                    }
221                    catch (RemoteException re) {
222                            throw new RuntimeException(re);
223                    }
224            }
225    
226            public static boolean registerSPIProvider(SPIProvider spiProvider) {
227                    String spiProviderName = spiProvider.getName();
228    
229                    SPIProviderContainer previousSPIProviderContainer =
230                            _spiProviderContainers.putIfAbsent(
231                                    spiProviderName, new SPIProviderContainer(spiProvider));
232    
233                    if (previousSPIProviderContainer != null) {
234                            if (_log.isWarnEnabled()) {
235                                    _log.warn(
236                                            "Not registering SPI provider " + spiProvider +
237                                                    " because it duplicates " +
238                                                            previousSPIProviderContainer.getSPIProvider());
239                            }
240    
241                            return false;
242                    }
243    
244                    if (_log.isInfoEnabled()) {
245                            _log.info("Registered SPI provider " + spiProvider);
246                    }
247    
248                    return true;
249            }
250    
251            public static void shutdown() {
252                    try {
253                            UnicastRemoteObject.unexportObject(_mpiImpl, true);
254                    }
255                    catch (NoSuchObjectException nsoe) {
256                            if (_log.isWarnEnabled()) {
257                                    _log.warn("Unable to unexport " + _mpiImpl, nsoe);
258                            }
259                    }
260    
261                    try {
262                            _intraband.close();
263                    }
264                    catch (Exception e) {
265                            if (_log.isWarnEnabled()) {
266                                    _log.warn("Unable to close intraband", e);
267                            }
268                    }
269            }
270    
271            public static boolean unregisterSPI(SPI spi) {
272                    try {
273                            if (spi == _unregisteringSPIThreadLocal.get()) {
274                                    _doUnregisterSPI(spi);
275    
276                                    return true;
277                            }
278    
279                            MPI mpi = spi.getMPI();
280    
281                            if (mpi != _mpi) {
282                                    if (_log.isWarnEnabled()) {
283                                            _log.warn(
284                                                    "Not unregistering SPI " + spi + " with foreign MPI " +
285                                                            mpi + " versus " + _mpi);
286                                    }
287    
288                                    return false;
289                            }
290    
291                            String spiProviderName = spi.getSPIProviderName();
292    
293                            SPIProviderContainer spiProviderContainer =
294                                    _spiProviderContainers.get(spiProviderName);
295    
296                            if (spiProviderContainer == null) {
297                                    if (_log.isWarnEnabled()) {
298                                            _log.warn(
299                                                    "Not unregistering SPI " + spi +
300                                                            " with unknown SPI provider " + spiProviderName);
301                                    }
302    
303                                    return false;
304                            }
305    
306                            SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
307    
308                            if (spiProviderContainer.removeSPI(
309                                            spiConfiguration.getSPIId(), spi)) {
310    
311                                    _doUnregisterSPI(spi);
312    
313                                    return true;
314                            }
315    
316                            if (_log.isWarnEnabled()) {
317                                    _log.warn("Not unregistering unregistered SPI " + spi);
318                            }
319    
320                            return false;
321                    }
322                    catch (RemoteException re) {
323                            throw new RuntimeException(re);
324                    }
325            }
326    
327            public static boolean unregisterSPIProvider(SPIProvider spiProvider) {
328                    String spiProviderName = spiProvider.getName();
329    
330                    SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
331                            spiProviderName);
332    
333                    if ((spiProviderContainer != null) &&
334                            (spiProviderContainer.getSPIProvider() == spiProvider) &&
335                            _spiProviderContainers.remove(
336                                    spiProviderName, spiProviderContainer)) {
337    
338                            Collection<SPI> spis = spiProviderContainer.getSPIs();
339    
340                            Iterator<SPI> iterator = spis.iterator();
341    
342                            while (iterator.hasNext()) {
343                                    SPI spi = iterator.next();
344    
345                                    iterator.remove();
346    
347                                    _unregisteringSPIThreadLocal.set(spi);
348    
349                                    try {
350                                            spi.stop();
351    
352                                            spi.destroy();
353    
354                                            if (_log.isInfoEnabled()) {
355                                                    _log.info(
356                                                            "Unregistered SPI " + spi +
357                                                                    " while unregistering SPI provider " +
358                                                                            spiProvider);
359                                            }
360                                    }
361                                    catch (RemoteException re) {
362                                            _log.error(
363                                                    "Unable to unregister SPI " + spi +
364                                                            " while unregistering SPI provider " + spiProvider,
365                                                    re);
366                                    }
367                                    finally {
368                                            _unregisteringSPIThreadLocal.remove();
369                                    }
370                            }
371    
372                            if (_log.isInfoEnabled()) {
373                                    _log.info("Unregistered SPI provider " + spiProvider);
374                            }
375    
376                            return true;
377                    }
378    
379                    if (_log.isWarnEnabled()) {
380                            _log.warn(
381                                    "Not unregistering unregistered SPI provider " + spiProvider);
382                    }
383    
384                    return false;
385            }
386    
387            private static void _doUnregisterSPI(SPI spi) throws RemoteException {
388                    SPIRegistryUtil.unregisterSPI(spi);
389    
390                    SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
391    
392                    for (String servletContextName :
393                                    spiConfiguration.getServletContextNames()) {
394    
395                            List<MessagingConfigurator> messagingConfigurators =
396                                    MessagingConfiguratorRegistry.getMessagingConfigurators(
397                                            servletContextName);
398    
399                            if (messagingConfigurators == null) {
400                                    continue;
401                            }
402    
403                            for (MessagingConfigurator messagingConfigurator :
404                                            messagingConfigurators) {
405    
406                                    messagingConfigurator.connect();
407                            }
408                    }
409    
410                    if (_log.isInfoEnabled()) {
411                            _log.info("Unregistered SPI " + spi);
412                    }
413            }
414    
415            private static final Log _log = LogFactoryUtil.getLog(MPIHelperUtil.class);
416    
417            private static final Intraband _intraband;
418            private static final MPI _mpi;
419            private static final MPI _mpiImpl;
420            private static final ConcurrentMap<String, SPIProviderContainer>
421                    _spiProviderContainers = new ConcurrentHashMap<>();
422            private static final ThreadLocal<SPI> _unregisteringSPIThreadLocal =
423                    new CentralizedThreadLocal<>(true);
424    
425            static {
426    
427                    // Keep strong reference to prevent garbage collection
428    
429                    _mpiImpl = new MPIImpl();
430    
431                    try {
432                            if (PropsUtil.getProps() != null) {
433                                    System.setProperty(
434                                            PropsKeys.INTRABAND_IMPL,
435                                            PropsUtil.get(PropsKeys.INTRABAND_IMPL));
436                                    System.setProperty(
437                                            PropsKeys.INTRABAND_TIMEOUT_DEFAULT,
438                                            PropsUtil.get(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
439                                    System.setProperty(
440                                            PropsKeys.INTRABAND_WELDER_IMPL,
441                                            PropsUtil.get(PropsKeys.INTRABAND_WELDER_IMPL));
442                            }
443    
444                            _intraband = IntrabandFactoryUtil.createIntraband();
445    
446                            _intraband.registerDatagramReceiveHandler(
447                                    SystemDataType.RPC.getValue(),
448                                    new BootstrapRPCDatagramReceiveHandler());
449    
450                            _mpi = (MPI)UnicastRemoteObject.exportObject(_mpiImpl, 0);
451                    }
452                    catch (Exception e) {
453                            throw new ExceptionInInitializerError(e);
454                    }
455            }
456    
457            private static class MPIImpl implements MPI {
458    
459                    @Override
460                    public boolean isAlive() {
461                            return true;
462                    }
463    
464            }
465    
466            private static class SPIProviderContainer {
467    
468                    public SPIProviderContainer(SPIProvider spiProvider) {
469                            _spiProvider = spiProvider;
470                    }
471    
472                    public SPI getSPI(String spiId) {
473                            return _spis.get(spiId);
474                    }
475    
476                    public SPIProvider getSPIProvider() {
477                            return _spiProvider;
478                    }
479    
480                    public Collection<SPI> getSPIs() {
481                            return _spis.values();
482                    }
483    
484                    public SPI putSPIIfAbsent(String spiId, SPI spi) {
485                            return _spis.putIfAbsent(spiId, spi);
486                    }
487    
488                    public boolean removeSPI(String spiId, SPI spi) {
489                            return _spis.remove(spiId, spi);
490                    }
491    
492                    private final SPIProvider _spiProvider;
493                    private final ConcurrentMap<String, SPI> _spis =
494                            new ConcurrentHashMap<>();
495    
496            }
497    
498    }