001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.resiliency.spi.remote;
016    
017    import com.liferay.portal.kernel.deploy.hot.DependencyManagementThreadLocal;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
022    import com.liferay.portal.kernel.nio.intraband.welder.Welder;
023    import com.liferay.portal.kernel.nio.intraband.welder.WelderFactoryUtil;
024    import com.liferay.portal.kernel.process.ProcessCallable;
025    import com.liferay.portal.kernel.process.ProcessException;
026    import com.liferay.portal.kernel.process.local.LocalProcessLauncher;
027    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
028    import com.liferay.portal.kernel.resiliency.mpi.MPI;
029    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
030    import com.liferay.portal.kernel.resiliency.spi.SPI;
031    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
032    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
033    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgentFactoryUtil;
034    import com.liferay.portal.kernel.resiliency.spi.provider.SPISynchronousQueueUtil;
035    import com.liferay.portal.kernel.util.PropsKeys;
036    import com.liferay.portal.kernel.util.ReflectionUtil;
037    
038    import java.io.IOException;
039    import java.io.ObjectInputStream;
040    import java.io.ObjectOutputStream;
041    
042    import java.lang.reflect.Field;
043    
044    import java.rmi.Remote;
045    import java.rmi.RemoteException;
046    import java.rmi.server.UnicastRemoteObject;
047    
048    import java.util.UUID;
049    import java.util.concurrent.ConcurrentMap;
050    import java.util.concurrent.CountDownLatch;
051    import java.util.concurrent.Future;
052    import java.util.concurrent.TimeUnit;
053    
054    /**
055     * @author Shuyang Zhou
056     */
057    public abstract class RemoteSPI implements ProcessCallable<SPI>, Remote, SPI {
058    
059            public RemoteSPI(SPIConfiguration spiConfiguration) {
060                    this.spiConfiguration = spiConfiguration;
061    
062                    mpi = MPIHelperUtil.getMPI();
063    
064                    UUID uuidObject = UUID.randomUUID();
065    
066                    uuid = uuidObject.toString();
067    
068                    welder = WelderFactoryUtil.createWelder();
069            }
070    
071            @Override
072            public SPI call() throws ProcessException {
073                    try {
074                            SPIShutdownHook spiShutdownHook = new SPIShutdownHook();
075    
076                            LocalProcessLauncher.ProcessContext.attach(
077                                    spiConfiguration.getSPIId(), spiConfiguration.getPingInterval(),
078                                    spiShutdownHook);
079    
080                            Runtime runtime = Runtime.getRuntime();
081    
082                            runtime.addShutdownHook(spiShutdownHook);
083    
084                            SPI spi = (SPI)UnicastRemoteObject.exportObject(this, 0);
085    
086                            RegisterCallback registerCallback = new RegisterCallback(uuid, spi);
087    
088                            ProcessOutputStream processOutputStream =
089                                    LocalProcessLauncher.ProcessContext.getProcessOutputStream();
090    
091                            processOutputStream.writeProcessCallable(registerCallback);
092    
093                            registrationReference = welder.weld(MPIHelperUtil.getIntraband());
094    
095                            ConcurrentMap<String, Object> attributes =
096                                    LocalProcessLauncher.ProcessContext.getAttributes();
097    
098                            attributes.put(SPI.SPI_INSTANCE_PUBLICATION_KEY, this);
099    
100                            return spi;
101                    }
102                    catch (RemoteException re) {
103                            throw new ProcessException("Failed to export SPI as RMI stub.", re);
104                    }
105                    catch (IOException ioe) {
106                            throw new ProcessException(ioe);
107                    }
108            }
109    
110            @Override
111            public void destroy() throws RemoteException {
112                    try {
113                            doDestroy();
114    
115                            if (countDownLatch != null) {
116                                    countDownLatch.countDown();
117                            }
118                    }
119                    finally {
120                            UnicastRemoteObject.unexportObject(RemoteSPI.this, true);
121                    }
122            }
123    
124            @Override
125            public MPI getMPI() {
126                    return mpi;
127            }
128    
129            @Override
130            public RegistrationReference getRegistrationReference() {
131                    return registrationReference;
132            }
133    
134            @Override
135            public SPIAgent getSPIAgent() {
136                    if (spiAgent == null) {
137                            spiAgent = SPIAgentFactoryUtil.createSPIAgent(
138                                    spiConfiguration, registrationReference);
139                    }
140    
141                    return spiAgent;
142            }
143    
144            @Override
145            public SPIConfiguration getSPIConfiguration() {
146                    return spiConfiguration;
147            }
148    
149            public String getUUID() {
150                    return uuid;
151            }
152    
153            public Welder getWelder() {
154                    return welder;
155            }
156    
157            @Override
158            public boolean isAlive() {
159                    return true;
160            }
161    
162            protected abstract void doDestroy() throws RemoteException;
163    
164            protected transient CountDownLatch countDownLatch;
165            protected final MPI mpi;
166            protected RegistrationReference registrationReference;
167            protected transient volatile SPIAgent spiAgent;
168            protected final SPIConfiguration spiConfiguration;
169            protected final String uuid;
170            protected final Welder welder;
171    
172            protected static class RegisterCallback implements ProcessCallable<SPI> {
173    
174                    public RegisterCallback(String spiUUID, SPI spi) {
175                            _spiUUID = spiUUID;
176                            _spi = spi;
177                    }
178    
179                    @Override
180                    public SPI call() throws ProcessException {
181                            try {
182                                    SPISynchronousQueueUtil.notifySynchronousQueue(_spiUUID, _spi);
183                            }
184                            catch (InterruptedException ie) {
185                                    throw new ProcessException(ie);
186                            }
187    
188                            return _spi;
189                    }
190    
191                    private static final long serialVersionUID = 1L;
192    
193                    private final SPI _spi;
194                    private final String _spiUUID;
195    
196            }
197    
198            protected static class UnregisterSPIProcessCallable
199                    implements ProcessCallable<Boolean> {
200    
201                    public UnregisterSPIProcessCallable(
202                            String spiProviderName, String spiId) {
203    
204                            _spiProviderName = spiProviderName;
205                            _spiId = spiId;
206                    }
207    
208                    @Override
209                    public Boolean call() {
210                            SPI spi = MPIHelperUtil.getSPI(_spiProviderName, _spiId);
211    
212                            if (spi != null) {
213                                    return MPIHelperUtil.unregisterSPI(spi);
214                            }
215    
216                            return false;
217                    }
218    
219                    private static final long serialVersionUID = 1L;
220    
221                    private final String _spiId;
222                    private final String _spiProviderName;
223    
224            }
225    
226            protected class SPIShutdownHook
227                    extends Thread implements LocalProcessLauncher.ShutdownHook {
228    
229                    public SPIShutdownHook() {
230                            setDaemon(true);
231                            setName(SPIShutdownHook.class.getSimpleName());
232                    }
233    
234                    @Override
235                    public void run() {
236                            if (countDownLatch.getCount() == 0) {
237                                    return;
238                            }
239    
240                            boolean unregistered = false;
241    
242                            try {
243                                    Future<Boolean> future = IntrabandRPCUtil.execute(
244                                            registrationReference,
245                                            new UnregisterSPIProcessCallable(
246                                                    getSPIProviderName(), spiConfiguration.getSPIId()));
247    
248                                    unregistered = future.get();
249                            }
250                            catch (Exception e) {
251                                    if (_log.isWarnEnabled()) {
252                                            _log.warn("Unable to unregister SPI from MPI", e);
253                                    }
254                            }
255    
256                            if (unregistered || !waitForMPI()) {
257                                    doShutdown();
258                            }
259                    }
260    
261                    @Override
262                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable) {
263                            Runtime runtime = Runtime.getRuntime();
264    
265                            runtime.removeShutdownHook(this);
266    
267                            doShutdown();
268    
269                            return true;
270                    }
271    
272                    private void doShutdown() {
273                            try {
274                                    RemoteSPI.this.stop();
275                            }
276                            catch (RemoteException re) {
277                                    _log.error("Unable to stop SPI", re);
278                            }
279    
280                            try {
281                                    RemoteSPI.this.destroy();
282                            }
283                            catch (RemoteException re) {
284                                    _log.error("Unable to destroy SPI", re);
285                            }
286                    }
287    
288                    private boolean waitForMPI() {
289                            if (_log.isInfoEnabled()) {
290                                    _log.info(
291                                            "Wait up to " + spiConfiguration.getShutdownTimeout() +
292                                                    " ms for MPI shutdown request");
293                            }
294    
295                            try {
296                                    if (countDownLatch.await(
297                                                    spiConfiguration.getShutdownTimeout(),
298                                                    TimeUnit.MILLISECONDS)) {
299    
300                                            if (_log.isInfoEnabled()) {
301                                                    _log.info("MPI shutdown request received");
302                                            }
303    
304                                            return true;
305                                    }
306                            }
307                            catch (InterruptedException ie) {
308                            }
309    
310                            if (_log.isInfoEnabled()) {
311                                    _log.info("Proceed with SPI shutdown");
312                            }
313    
314                            return false;
315                    }
316    
317            }
318    
319            private void readObject(ObjectInputStream objectInputStream)
320                    throws ClassNotFoundException, IOException {
321    
322                    objectInputStream.defaultReadObject();
323    
324                    System.setProperty(
325                            PropsKeys.INTRABAND_IMPL, objectInputStream.readUTF());
326                    System.setProperty(
327                            PropsKeys.INTRABAND_TIMEOUT_DEFAULT, objectInputStream.readUTF());
328                    System.setProperty(
329                            PropsKeys.INTRABAND_WELDER_IMPL, objectInputStream.readUTF());
330                    System.setProperty(
331                            "portal:" + PropsKeys.LIFERAY_HOME, objectInputStream.readUTF());
332    
333                    // Disable auto deploy
334    
335                    System.setProperty("portal:" + PropsKeys.AUTO_DEPLOY_ENABLED, "false");
336    
337                    // Disable cluster link
338    
339                    System.setProperty("portal:" + PropsKeys.CLUSTER_LINK_ENABLED, "false");
340    
341                    // Disable dependency management
342    
343                    try {
344                            Field enabledField = ReflectionUtil.getDeclaredField(
345                                    DependencyManagementThreadLocal.class, "_enabled");
346    
347                            enabledField.set(
348                                    null,
349                                    new ThreadLocal<Boolean>() {
350    
351                                            @Override
352                                            public Boolean get() {
353                                                    return Boolean.FALSE;
354                                            }
355    
356                                    });
357                    }
358                    catch (Exception e) {
359                            throw new IOException("Unable to disable dependency management", e);
360                    }
361    
362                    // Log4j log file postfix
363    
364                    System.setProperty("spi.id", "-" + spiConfiguration.getSPIId());
365    
366                    countDownLatch = new CountDownLatch(1);
367            }
368    
369            private void writeObject(ObjectOutputStream objectOutputStream)
370                    throws IOException {
371    
372                    objectOutputStream.defaultWriteObject();
373    
374                    objectOutputStream.writeUTF(
375                            System.getProperty(PropsKeys.INTRABAND_IMPL));
376                    objectOutputStream.writeUTF(
377                            System.getProperty(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
378                    objectOutputStream.writeUTF(
379                            System.getProperty(PropsKeys.INTRABAND_WELDER_IMPL));
380                    objectOutputStream.writeUTF(System.getProperty(PropsKeys.LIFERAY_HOME));
381            }
382    
383            private static final Log _log = LogFactoryUtil.getLog(RemoteSPI.class);
384    
385    }