001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process.local;
016    
017    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
018    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
020    import com.liferay.portal.kernel.process.ClassPathUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.process.ProcessException;
023    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025    import com.liferay.portal.kernel.util.StringPool;
026    
027    import java.io.FileDescriptor;
028    import java.io.FileOutputStream;
029    import java.io.IOException;
030    import java.io.ObjectInputStream;
031    import java.io.ObjectOutputStream;
032    import java.io.PrintStream;
033    import java.io.Serializable;
034    
035    import java.net.URLClassLoader;
036    
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.ConcurrentMap;
039    import java.util.concurrent.atomic.AtomicReference;
040    
041    /**
042     * @author Shuyang Zhou
043     */
044    public class LocalProcessLauncher {
045    
046            public static void main(String[] arguments)
047                    throws ClassNotFoundException, IOException {
048    
049                    PrintStream oldOutPrintStream = System.out;
050    
051                    ObjectOutputStream objectOutputStream = null;
052                    ProcessOutputStream outProcessOutputStream = null;
053    
054                    synchronized (oldOutPrintStream) {
055                            oldOutPrintStream.flush();
056    
057                            FileOutputStream fileOutputStream = new FileOutputStream(
058                                    FileDescriptor.out);
059    
060                            objectOutputStream = new ObjectOutputStream(
061                                    new UnsyncBufferedOutputStream(fileOutputStream));
062    
063                            outProcessOutputStream = new ProcessOutputStream(
064                                    objectOutputStream, false);
065    
066                            ProcessContext._setProcessOutputStream(outProcessOutputStream);
067    
068                            PrintStream newOutPrintStream = new PrintStream(
069                                    outProcessOutputStream, true);
070    
071                            System.setOut(newOutPrintStream);
072                    }
073    
074                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
075                            objectOutputStream, true);
076    
077                    PrintStream errPrintStream = new PrintStream(
078                            errProcessOutputStream, true);
079    
080                    System.setErr(errPrintStream);
081    
082                    Thread currentThread = Thread.currentThread();
083    
084                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
085    
086                    try {
087                            ObjectInputStream bootstrapObjectInputStream =
088                                    new ObjectInputStream(System.in);
089    
090                            String processCallableName =
091                                    (String)bootstrapObjectInputStream.readObject();
092    
093                            String logPrefixString =
094                                    StringPool.OPEN_BRACKET.concat(processCallableName).concat(
095                                            StringPool.CLOSE_BRACKET);
096    
097                            byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
098    
099                            outProcessOutputStream.setLogPrefix(logPrefix);
100                            errProcessOutputStream.setLogPrefix(logPrefix);
101    
102                            String classPath = (String)bootstrapObjectInputStream.readObject();
103    
104                            ClassLoader classLoader = new URLClassLoader(
105                                    ClassPathUtil.getClassPathURLs(classPath));
106    
107                            currentThread.setContextClassLoader(classLoader);
108    
109                            ObjectInputStream objectInputStream =
110                                    new ClassLoaderObjectInputStream(
111                                            bootstrapObjectInputStream, classLoader);
112    
113                            ProcessCallable<?> processCallable =
114                                    (ProcessCallable<?>)objectInputStream.readObject();
115    
116                            Thread thread = new Thread(
117                                    new ProcessCallableRunner(objectInputStream),
118                                    "ProcessCallable-Runner");
119    
120                            thread.setDaemon(true);
121    
122                            thread.start();
123    
124                            Serializable result = processCallable.call();
125    
126                            System.out.flush();
127    
128                            outProcessOutputStream.writeProcessCallable(
129                                    new ReturnProcessCallable<Serializable>(result));
130    
131                            outProcessOutputStream.flush();
132                    }
133                    catch (ProcessException pe) {
134                            errPrintStream.flush();
135    
136                            errProcessOutputStream.writeProcessCallable(
137                                    new ExceptionProcessCallable(pe));
138    
139                            errProcessOutputStream.flush();
140                    }
141                    finally {
142                            currentThread.setContextClassLoader(contextClassLoader);
143                    }
144            }
145    
146            public static class ProcessContext {
147    
148                    public static boolean attach(
149                            String message, long interval, ShutdownHook shutdownHook) {
150    
151                            HeartbeatThread heartbeatThread = new HeartbeatThread(
152                                    message, interval, shutdownHook);
153    
154                            boolean value = _heartbeatThreadReference.compareAndSet(
155                                    null, heartbeatThread);
156    
157                            if (value) {
158                                    heartbeatThread.start();
159                            }
160    
161                            return value;
162                    }
163    
164                    public static void detach() throws InterruptedException {
165                            HeartbeatThread heartbeatThread =
166                                    _heartbeatThreadReference.getAndSet(null);
167    
168                            if (heartbeatThread != null) {
169                                    heartbeatThread.detach();
170                                    heartbeatThread.join();
171                            }
172                    }
173    
174                    public static ConcurrentMap<String, Object> getAttributes() {
175                            return _attributes;
176                    }
177    
178                    public static ProcessOutputStream getProcessOutputStream() {
179                            return _processOutputStream;
180                    }
181    
182                    public static boolean isAttached() {
183                            HeartbeatThread attachThread = _heartbeatThreadReference.get();
184    
185                            if (attachThread != null) {
186                                    return true;
187                            }
188                            else {
189                                    return false;
190                            }
191                    }
192    
193                    private static void _setProcessOutputStream(
194                            ProcessOutputStream processOutputStream) {
195    
196                            _processOutputStream = processOutputStream;
197                    }
198    
199                    private ProcessContext() {
200                    }
201    
202                    private static final ConcurrentMap<String, Object> _attributes =
203                            new ConcurrentHashMap<String, Object>();
204                    private static final AtomicReference<HeartbeatThread>
205                            _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
206                    private static ProcessOutputStream _processOutputStream;
207    
208            }
209    
210            public interface ShutdownHook {
211    
212                    public static final int BROKEN_PIPE_CODE = 1;
213    
214                    public static final int INTERRUPTION_CODE = 2;
215    
216                    public static final int UNKNOWN_CODE = 3;
217    
218                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
219    
220            }
221    
222            private static class HeartbeatThread extends Thread {
223    
224                    public HeartbeatThread(
225                            String message, long interval, ShutdownHook shutdownHook) {
226    
227                            if (shutdownHook == null) {
228                                    throw new IllegalArgumentException("Shutdown hook is null");
229                            }
230    
231                            _interval = interval;
232                            _shutdownHook = shutdownHook;
233    
234                            _pringBackProcessCallable = new PingbackProcessCallable(message);
235    
236                            setDaemon(true);
237                            setName(HeartbeatThread.class.getSimpleName());
238                    }
239    
240                    public void detach() {
241                            _detach = true;
242    
243                            interrupt();
244                    }
245    
246                    @Override
247                    public void run() {
248                            ProcessOutputStream processOutputStream =
249                                    ProcessContext.getProcessOutputStream();
250    
251                            int shutdownCode = 0;
252                            Throwable shutdownThrowable = null;
253    
254                            while (!_detach) {
255                                    try {
256                                            sleep(_interval);
257    
258                                            processOutputStream.writeProcessCallable(
259                                                    _pringBackProcessCallable);
260                                    }
261                                    catch (InterruptedException ie) {
262                                            if (_detach) {
263                                                    return;
264                                            }
265                                            else {
266                                                    shutdownThrowable = ie;
267    
268                                                    shutdownCode = ShutdownHook.INTERRUPTION_CODE;
269                                            }
270                                    }
271                                    catch (IOException ioe) {
272                                            shutdownThrowable = ioe;
273    
274                                            shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
275                                    }
276                                    catch (Throwable throwable) {
277                                            shutdownThrowable = throwable;
278    
279                                            shutdownCode = ShutdownHook.UNKNOWN_CODE;
280                                    }
281    
282                                    if (shutdownCode != 0) {
283                                            _detach = _shutdownHook.shutdown(
284                                                    shutdownCode, shutdownThrowable);
285                                    }
286                            }
287                    }
288    
289                    private volatile boolean _detach;
290                    private final long _interval;
291                    private final ProcessCallable<String> _pringBackProcessCallable;
292                    private final ShutdownHook _shutdownHook;
293    
294            }
295    
296            private static class PingbackProcessCallable
297                    implements ProcessCallable<String> {
298    
299                    public PingbackProcessCallable(String message) {
300                            _message = message;
301                    }
302    
303                    @Override
304                    public String call() {
305                            return _message;
306                    }
307    
308                    private static final long serialVersionUID = 1L;
309    
310                    private final String _message;
311    
312            }
313    
314            private static class ProcessCallableRunner implements Runnable {
315    
316                    public ProcessCallableRunner(ObjectInputStream objectInputStream) {
317                            _objectInputStream = objectInputStream;
318                    }
319    
320                    @Override
321                    public void run() {
322                            while (true) {
323                                    try {
324                                            ProcessCallable<?> processCallable =
325                                                    (ProcessCallable<?>)_objectInputStream.readObject();
326    
327                                            processCallable.call();
328                                    }
329                                    catch (Exception e) {
330                                            UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
331                                                    new UnsyncByteArrayOutputStream();
332    
333                                            UnsyncPrintWriter unsyncPrintWriter = new UnsyncPrintWriter(
334                                                    unsyncByteArrayOutputStream);
335    
336                                            unsyncPrintWriter.println(e);
337    
338                                            e.printStackTrace(unsyncPrintWriter);
339    
340                                            unsyncPrintWriter.println();
341    
342                                            unsyncPrintWriter.flush();
343    
344                                            System.err.write(
345                                                    unsyncByteArrayOutputStream.unsafeGetByteArray(), 0,
346                                                    unsyncByteArrayOutputStream.size());
347                                            System.err.flush();
348                                    }
349                            }
350                    }
351    
352                    private final ObjectInputStream _objectInputStream;
353    
354            }
355    
356    }