001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.resiliency.spi.provider;
016    
017    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
018    import com.liferay.portal.kernel.nio.intraband.welder.Welder;
019    import com.liferay.portal.kernel.process.ProcessChannel;
020    import com.liferay.portal.kernel.process.ProcessConfig.Builder;
021    import com.liferay.portal.kernel.process.ProcessExecutorUtil;
022    import com.liferay.portal.kernel.resiliency.PortalResiliencyException;
023    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
024    import com.liferay.portal.kernel.resiliency.spi.SPI;
025    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
026    import com.liferay.portal.kernel.resiliency.spi.remote.RemoteSPI;
027    import com.liferay.portal.kernel.resiliency.spi.remote.RemoteSPIProxy;
028    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029    import com.liferay.portal.kernel.util.StringBundler;
030    
031    import java.util.concurrent.Callable;
032    import java.util.concurrent.Future;
033    import java.util.concurrent.FutureTask;
034    import java.util.concurrent.SynchronousQueue;
035    import java.util.concurrent.TimeUnit;
036    
037    /**
038     * @author Shuyang Zhou
039     */
040    public abstract class BaseSPIProvider implements SPIProvider {
041    
042            public abstract RemoteSPI createRemoteSPI(SPIConfiguration spiConfiguration)
043                    throws PortalResiliencyException;
044    
045            @Override
046            public SPI createSPI(SPIConfiguration spiConfiguration)
047                    throws PortalResiliencyException {
048    
049                    Builder builder = new Builder();
050    
051                    builder.setArguments(spiConfiguration.getJVMArguments());
052                    builder.setBootstrapClassPath(getClassPath());
053                    builder.setJavaExecutable(spiConfiguration.getJavaExecutable());
054                    builder.setReactClassLoader(PortalClassLoaderUtil.getClassLoader());
055                    builder.setRuntimeClassPath(getClassPath());
056    
057                    RemoteSPI remoteSPI = createRemoteSPI(spiConfiguration);
058    
059                    String spiUUID = remoteSPI.getUUID();
060    
061                    SynchronousQueue<SPI> synchronousQueue =
062                            SPISynchronousQueueUtil.createSynchronousQueue(spiUUID);
063    
064                    FutureTask<RegistrationReference> weldServerFutureTask =
065                            new FutureTask<>(new WeldServerCallable(remoteSPI.getWelder()));
066    
067                    Thread weldServerThread = new Thread(
068                            weldServerFutureTask,
069                            "Weld Server Thread for " + spiConfiguration.getSPIId());
070    
071                    weldServerThread.setDaemon(true);
072    
073                    weldServerThread.start();
074    
075                    try {
076                            ProcessChannel<SPI> processChannel = ProcessExecutorUtil.execute(
077                                    builder.build(), remoteSPI);
078    
079                            Future<SPI> cancelHandlerFuture =
080                                    processChannel.getProcessNoticeableFuture();
081    
082                            SPI spi = synchronousQueue.poll(
083                                    spiConfiguration.getRegisterTimeout(), TimeUnit.MILLISECONDS);
084    
085                            if (spi != null) {
086                                    RegistrationReference registrationReference =
087                                            weldServerFutureTask.get(
088                                                    spiConfiguration.getRegisterTimeout(),
089                                                    TimeUnit.MILLISECONDS);
090    
091                                    RemoteSPIProxy remoteSPIProxy = new RemoteSPIProxy(
092                                            spi, spiConfiguration, getName(), cancelHandlerFuture,
093                                            registrationReference);
094    
095                                    if (!MPIHelperUtil.registerSPI(remoteSPIProxy)) {
096                                            cancelHandlerFuture.cancel(true);
097    
098                                            throw new PortalResiliencyException(
099                                                    "Unable to register SPI " + remoteSPIProxy +
100                                                            ". Forcibly cancelled SPI process launch.");
101                                    }
102    
103                                    return remoteSPIProxy;
104                            }
105    
106                            cancelHandlerFuture.cancel(true);
107    
108                            throw new PortalResiliencyException(
109                                    "SPI synchronous queue waiting timeout. Forcibly " +
110                                            "cancelled SPI process launch.");
111                    }
112                    catch (InterruptedException ie) {
113                            throw new PortalResiliencyException(
114                                    "Interrupted on waiting SPI process, registering back RMI stub",
115                                    ie);
116                    }
117                    catch (PortalResiliencyException pre) {
118                            throw pre;
119                    }
120                    catch (Exception e) {
121                            throw new PortalResiliencyException(
122                                    "Unable to launch SPI process", e);
123                    }
124                    finally {
125                            weldServerFutureTask.cancel(true);
126    
127                            SPISynchronousQueueUtil.destroySynchronousQueue(spiUUID);
128                    }
129            }
130    
131            public abstract String getClassPath();
132    
133            @Override
134            public String toString() {
135                    StringBundler sb = new StringBundler(5);
136    
137                    sb.append("{name=");
138                    sb.append(getName());
139                    sb.append(", classPath=");
140                    sb.append(getClassPath());
141                    sb.append("}");
142    
143                    return sb.toString();
144            }
145    
146            protected static class WeldServerCallable
147                    implements Callable<RegistrationReference> {
148    
149                    public WeldServerCallable(Welder welder) {
150                            _welder = welder;
151                    }
152    
153                    @Override
154                    public RegistrationReference call() throws Exception {
155                            return _welder.weld(MPIHelperUtil.getIntraband());
156                    }
157    
158                    private final Welder _welder;
159    
160            }
161    
162    }