001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.resiliency.spi.remote;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021    import com.liferay.portal.kernel.nio.intraband.welder.Welder;
022    import com.liferay.portal.kernel.nio.intraband.welder.WelderFactoryUtil;
023    import com.liferay.portal.kernel.process.ProcessCallable;
024    import com.liferay.portal.kernel.process.ProcessException;
025    import com.liferay.portal.kernel.process.local.LocalProcessLauncher;
026    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
027    import com.liferay.portal.kernel.resiliency.mpi.MPI;
028    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
029    import com.liferay.portal.kernel.resiliency.spi.SPI;
030    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
031    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
032    import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgentFactoryUtil;
033    import com.liferay.portal.kernel.resiliency.spi.provider.SPISynchronousQueueUtil;
034    import com.liferay.portal.kernel.util.PropsKeys;
035    
036    import java.io.IOException;
037    import java.io.ObjectInputStream;
038    import java.io.ObjectOutputStream;
039    
040    import java.rmi.Remote;
041    import java.rmi.RemoteException;
042    import java.rmi.server.UnicastRemoteObject;
043    
044    import java.util.UUID;
045    import java.util.concurrent.ConcurrentMap;
046    import java.util.concurrent.CountDownLatch;
047    import java.util.concurrent.Future;
048    import java.util.concurrent.TimeUnit;
049    
050    /**
051     * @author Shuyang Zhou
052     */
053    public abstract class RemoteSPI implements ProcessCallable<SPI>, Remote, SPI {
054    
055            public RemoteSPI(SPIConfiguration spiConfiguration) {
056                    this.spiConfiguration = spiConfiguration;
057    
058                    mpi = MPIHelperUtil.getMPI();
059    
060                    UUID uuidObject = UUID.randomUUID();
061    
062                    uuid = uuidObject.toString();
063    
064                    welder = WelderFactoryUtil.createWelder();
065            }
066    
067            @Override
068            public SPI call() throws ProcessException {
069                    try {
070                            SPIShutdownHook spiShutdownHook = new SPIShutdownHook();
071    
072                            LocalProcessLauncher.ProcessContext.attach(
073                                    spiConfiguration.getSPIId(), spiConfiguration.getPingInterval(),
074                                    spiShutdownHook);
075    
076                            Runtime runtime = Runtime.getRuntime();
077    
078                            runtime.addShutdownHook(spiShutdownHook);
079    
080                            SPI spi = (SPI)UnicastRemoteObject.exportObject(this, 0);
081    
082                            RegisterCallback registerCallback = new RegisterCallback(uuid, spi);
083    
084                            ProcessOutputStream processOutputStream =
085                                    LocalProcessLauncher.ProcessContext.getProcessOutputStream();
086    
087                            processOutputStream.writeProcessCallable(registerCallback);
088    
089                            registrationReference = welder.weld(MPIHelperUtil.getIntraband());
090    
091                            ConcurrentMap<String, Object> attributes =
092                                    LocalProcessLauncher.ProcessContext.getAttributes();
093    
094                            attributes.put(SPI.SPI_INSTANCE_PUBLICATION_KEY, this);
095    
096                            return spi;
097                    }
098                    catch (RemoteException re) {
099                            throw new ProcessException("Failed to export SPI as RMI stub.", re);
100                    }
101                    catch (IOException ioe) {
102                            throw new ProcessException(ioe);
103                    }
104            }
105    
106            @Override
107            public void destroy() throws RemoteException {
108                    try {
109                            doDestroy();
110    
111                            if (countDownLatch != null) {
112                                    countDownLatch.countDown();
113                            }
114                    }
115                    finally {
116                            UnicastRemoteObject.unexportObject(RemoteSPI.this, true);
117                    }
118            }
119    
120            @Override
121            public MPI getMPI() {
122                    return mpi;
123            }
124    
125            @Override
126            public RegistrationReference getRegistrationReference() {
127                    return registrationReference;
128            }
129    
130            @Override
131            public SPIAgent getSPIAgent() {
132                    if (spiAgent == null) {
133                            spiAgent = SPIAgentFactoryUtil.createSPIAgent(
134                                    spiConfiguration, registrationReference);
135                    }
136    
137                    return spiAgent;
138            }
139    
140            @Override
141            public SPIConfiguration getSPIConfiguration() {
142                    return spiConfiguration;
143            }
144    
145            public String getUUID() {
146                    return uuid;
147            }
148    
149            public Welder getWelder() {
150                    return welder;
151            }
152    
153            @Override
154            public boolean isAlive() {
155                    return true;
156            }
157    
158            protected abstract void doDestroy() throws RemoteException;
159    
160            protected transient CountDownLatch countDownLatch;
161            protected final MPI mpi;
162            protected RegistrationReference registrationReference;
163            protected transient volatile SPIAgent spiAgent;
164            protected final SPIConfiguration spiConfiguration;
165            protected final String uuid;
166            protected final Welder welder;
167    
168            protected static class RegisterCallback implements ProcessCallable<SPI> {
169    
170                    public RegisterCallback(String spiUUID, SPI spi) {
171                            _spiUUID = spiUUID;
172                            _spi = spi;
173                    }
174    
175                    @Override
176                    public SPI call() throws ProcessException {
177                            try {
178                                    SPISynchronousQueueUtil.notifySynchronousQueue(_spiUUID, _spi);
179                            }
180                            catch (InterruptedException ie) {
181                                    throw new ProcessException(ie);
182                            }
183    
184                            return _spi;
185                    }
186    
187                    private static final long serialVersionUID = 1L;
188    
189                    private final SPI _spi;
190                    private final String _spiUUID;
191    
192            }
193    
194            protected static class UnregisterSPIProcessCallable
195                    implements ProcessCallable<Boolean> {
196    
197                    public UnregisterSPIProcessCallable(
198                            String spiProviderName, String spiId) {
199    
200                            _spiProviderName = spiProviderName;
201                            _spiId = spiId;
202                    }
203    
204                    @Override
205                    public Boolean call() {
206                            SPI spi = MPIHelperUtil.getSPI(_spiProviderName, _spiId);
207    
208                            if (spi != null) {
209                                    return MPIHelperUtil.unregisterSPI(spi);
210                            }
211    
212                            return false;
213                    }
214    
215                    private static final long serialVersionUID = 1L;
216    
217                    private final String _spiId;
218                    private final String _spiProviderName;
219    
220            }
221    
222            protected class SPIShutdownHook
223                    extends Thread implements LocalProcessLauncher.ShutdownHook {
224    
225                    public SPIShutdownHook() {
226                            setDaemon(true);
227                            setName(SPIShutdownHook.class.getSimpleName());
228                    }
229    
230                    @Override
231                    public void run() {
232                            if (countDownLatch.getCount() == 0) {
233                                    return;
234                            }
235    
236                            boolean unregistered = false;
237    
238                            try {
239                                    Future<Boolean> future = IntrabandRPCUtil.execute(
240                                            registrationReference,
241                                            new UnregisterSPIProcessCallable(
242                                                    getSPIProviderName(), spiConfiguration.getSPIId()));
243    
244                                    unregistered = future.get();
245                            }
246                            catch (Exception e) {
247                                    if (_log.isWarnEnabled()) {
248                                            _log.warn("Unable to unregister SPI from MPI", e);
249                                    }
250                            }
251    
252                            if (unregistered || !waitForMPI()) {
253                                    doShutdown();
254                            }
255                    }
256    
257                    @Override
258                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable) {
259                            Runtime runtime = Runtime.getRuntime();
260    
261                            runtime.removeShutdownHook(this);
262    
263                            doShutdown();
264    
265                            return true;
266                    }
267    
268                    private void doShutdown() {
269                            try {
270                                    RemoteSPI.this.stop();
271                            }
272                            catch (RemoteException re) {
273                                    _log.error("Unable to stop SPI", re);
274                            }
275    
276                            try {
277                                    RemoteSPI.this.destroy();
278                            }
279                            catch (RemoteException re) {
280                                    _log.error("Unable to destroy SPI", re);
281                            }
282                    }
283    
284                    private boolean waitForMPI() {
285                            if (_log.isInfoEnabled()) {
286                                    _log.info(
287                                            "Wait up to " + spiConfiguration.getShutdownTimeout() +
288                                                    " ms for MPI shutdown request");
289                            }
290    
291                            try {
292                                    if (countDownLatch.await(
293                                                    spiConfiguration.getShutdownTimeout(),
294                                                    TimeUnit.MILLISECONDS)) {
295    
296                                            if (_log.isInfoEnabled()) {
297                                                    _log.info("MPI shutdown request received");
298                                            }
299    
300                                            return true;
301                                    }
302                            }
303                            catch (InterruptedException ie) {
304                            }
305    
306                            if (_log.isInfoEnabled()) {
307                                    _log.info("Proceed with SPI shutdown");
308                            }
309    
310                            return false;
311                    }
312    
313            }
314    
315            private void readObject(ObjectInputStream objectInputStream)
316                    throws ClassNotFoundException, IOException {
317    
318                    objectInputStream.defaultReadObject();
319    
320                    System.setProperty(
321                            PropsKeys.INTRABAND_IMPL, objectInputStream.readUTF());
322                    System.setProperty(
323                            PropsKeys.INTRABAND_TIMEOUT_DEFAULT, objectInputStream.readUTF());
324                    System.setProperty(
325                            PropsKeys.INTRABAND_WELDER_IMPL, objectInputStream.readUTF());
326                    System.setProperty(
327                            "portal:" + PropsKeys.LIFERAY_HOME, objectInputStream.readUTF());
328    
329                    // Disable auto deploy
330    
331                    System.setProperty("portal:" + PropsKeys.AUTO_DEPLOY_ENABLED, "false");
332    
333                    // Disable cluster link
334    
335                    System.setProperty("portal:" + PropsKeys.CLUSTER_LINK_ENABLED, "false");
336    
337                    // Disable dependency management
338    
339                    System.setProperty(
340                            "portal:" + PropsKeys.HOT_DEPLOY_DEPENDENCY_MANAGEMENT_ENABLED,
341                            "false");
342    
343                    // Log4j log file postfix
344    
345                    System.setProperty("spi.id", "-" + spiConfiguration.getSPIId());
346    
347                    countDownLatch = new CountDownLatch(1);
348            }
349    
350            private void writeObject(ObjectOutputStream objectOutputStream)
351                    throws IOException {
352    
353                    objectOutputStream.defaultWriteObject();
354    
355                    objectOutputStream.writeUTF(
356                            System.getProperty(PropsKeys.INTRABAND_IMPL));
357                    objectOutputStream.writeUTF(
358                            System.getProperty(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
359                    objectOutputStream.writeUTF(
360                            System.getProperty(PropsKeys.INTRABAND_WELDER_IMPL));
361                    objectOutputStream.writeUTF(System.getProperty(PropsKeys.LIFERAY_HOME));
362            }
363    
364            private static final Log _log = LogFactoryUtil.getLog(RemoteSPI.class);
365    
366    }