001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.resiliency.spi.provider;
016    
017    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
018    import com.liferay.portal.kernel.nio.intraband.welder.Welder;
019    import com.liferay.portal.kernel.process.ProcessChannel;
020    import com.liferay.portal.kernel.process.ProcessConfig.Builder;
021    import com.liferay.portal.kernel.process.ProcessExecutorUtil;
022    import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
023    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
024    import com.liferay.portal.kernel.resiliency.spi.SPI;
025    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
026    import com.liferay.portal.kernel.resiliency.spi.remote.RemoteSPI;
027    import com.liferay.portal.kernel.resiliency.spi.remote.RemoteSPIProxy;
028    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029    import com.liferay.portal.kernel.util.StringBundler;
030    
031    import java.util.concurrent.Callable;
032    import java.util.concurrent.Future;
033    import java.util.concurrent.FutureTask;
034    import java.util.concurrent.SynchronousQueue;
035    import java.util.concurrent.TimeUnit;
036    
037    /**
038     * @author Shuyang Zhou
039     */
040    public abstract class BaseSPIProvider implements SPIProvider {
041    
042            public abstract RemoteSPI createRemoteSPI(SPIConfiguration spiConfiguration)
043                    throws PortalResiliencyException;
044    
045            @Override
046            public SPI createSPI(SPIConfiguration spiConfiguration)
047                    throws PortalResiliencyException {
048    
049                    Builder builder = new Builder();
050    
051                    builder.setArguments(spiConfiguration.getJVMArguments());
052                    builder.setBootstrapClassPath(getClassPath());
053                    builder.setJavaExecutable(spiConfiguration.getJavaExecutable());
054                    builder.setReactClassLoader(PortalClassLoaderUtil.getClassLoader());
055                    builder.setRuntimeClassPath(getClassPath());
056    
057                    RemoteSPI remoteSPI = createRemoteSPI(spiConfiguration);
058    
059                    String spiUUID = remoteSPI.getUUID();
060    
061                    SynchronousQueue<SPI> synchronousQueue =
062                            SPISynchronousQueueUtil.createSynchronousQueue(spiUUID);
063    
064                    FutureTask<RegistrationReference> weldServerFutureTask =
065                            new FutureTask<RegistrationReference>(
066                                    new WeldServerCallable(remoteSPI.getWelder()));
067    
068                    Thread weldServerThread = new Thread(
069                            weldServerFutureTask,
070                            "Weld Server Thread for " + spiConfiguration.getSPIId());
071    
072                    weldServerThread.setDaemon(true);
073    
074                    weldServerThread.start();
075    
076                    try {
077                            ProcessChannel<SPI> processChannel = ProcessExecutorUtil.execute(
078                                    builder.build(), remoteSPI);
079    
080                            Future<SPI> cancelHandlerFuture =
081                                    processChannel.getProcessNoticeableFuture();
082    
083                            SPI spi = synchronousQueue.poll(
084                                    spiConfiguration.getRegisterTimeout(), TimeUnit.MILLISECONDS);
085    
086                            if (spi != null) {
087                                    RegistrationReference registrationReference =
088                                            weldServerFutureTask.get(
089                                                    spiConfiguration.getRegisterTimeout(),
090                                                    TimeUnit.MILLISECONDS);
091    
092                                    RemoteSPIProxy remoteSPIProxy = new RemoteSPIProxy(
093                                            spi, spiConfiguration, getName(), cancelHandlerFuture,
094                                            registrationReference);
095    
096                                    if (!MPIHelperUtil.registerSPI(remoteSPIProxy)) {
097                                            cancelHandlerFuture.cancel(true);
098    
099                                            throw new PortalResiliencyException(
100                                                    "Unable to register SPI " + remoteSPIProxy +
101                                                            ". Forcibly cancelled SPI process launch.");
102                                    }
103    
104                                    return remoteSPIProxy;
105                            }
106    
107                            cancelHandlerFuture.cancel(true);
108    
109                            throw new PortalResiliencyException(
110                                    "SPI synchronous queue waiting timeout. Forcibly " +
111                                            "cancelled SPI process launch.");
112                    }
113                    catch (InterruptedException ie) {
114                            throw new PortalResiliencyException(
115                                    "Interrupted on waiting SPI process, registering back RMI stub",
116                                    ie);
117                    }
118                    catch (PortalResiliencyException pre) {
119                            throw pre;
120                    }
121                    catch (Exception e) {
122                            throw new PortalResiliencyException(
123                                    "Unable to launch SPI process", e);
124                    }
125                    finally {
126                            weldServerFutureTask.cancel(true);
127    
128                            SPISynchronousQueueUtil.destroySynchronousQueue(spiUUID);
129                    }
130            }
131    
132            public abstract String getClassPath();
133    
134            @Override
135            public String toString() {
136                    StringBundler sb = new StringBundler(5);
137    
138                    sb.append("{name=");
139                    sb.append(getName());
140                    sb.append(", classPath=");
141                    sb.append(getClassPath());
142                    sb.append("}");
143    
144                    return sb.toString();
145            }
146    
147            protected static class WeldServerCallable
148                    implements Callable<RegistrationReference> {
149    
150                    public WeldServerCallable(Welder welder) {
151                            _welder = welder;
152                    }
153    
154                    @Override
155                    public RegistrationReference call() throws Exception {
156                            return _welder.weld(MPIHelperUtil.getIntraband());
157                    }
158    
159                    private final Welder _welder;
160    
161            }
162    
163    }