001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process.local;
016    
017    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
018    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
020    import com.liferay.portal.kernel.process.ClassPathUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.process.ProcessException;
023    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025    import com.liferay.portal.kernel.util.StringPool;
026    
027    import java.io.FileDescriptor;
028    import java.io.FileOutputStream;
029    import java.io.IOException;
030    import java.io.ObjectInputStream;
031    import java.io.ObjectOutputStream;
032    import java.io.PrintStream;
033    import java.io.Serializable;
034    
035    import java.net.URLClassLoader;
036    
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.ConcurrentMap;
039    import java.util.concurrent.atomic.AtomicReference;
040    
041    /**
042     * @author Shuyang Zhou
043     */
044    public class LocalProcessLauncher {
045    
046            public static void main(String[] arguments) throws IOException {
047                    PrintStream oldOutPrintStream = System.out;
048    
049                    ObjectOutputStream objectOutputStream = null;
050                    ProcessOutputStream outProcessOutputStream = null;
051    
052                    synchronized (oldOutPrintStream) {
053                            oldOutPrintStream.flush();
054    
055                            FileOutputStream fileOutputStream = new FileOutputStream(
056                                    FileDescriptor.out);
057    
058                            objectOutputStream = new ObjectOutputStream(
059                                    new UnsyncBufferedOutputStream(fileOutputStream));
060    
061                            outProcessOutputStream = new ProcessOutputStream(
062                                    objectOutputStream, false);
063    
064                            ProcessContext._setProcessOutputStream(outProcessOutputStream);
065    
066                            PrintStream newOutPrintStream = new PrintStream(
067                                    outProcessOutputStream, true);
068    
069                            System.setOut(newOutPrintStream);
070                    }
071    
072                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
073                            objectOutputStream, true);
074    
075                    PrintStream errPrintStream = new PrintStream(
076                            errProcessOutputStream, true);
077    
078                    System.setErr(errPrintStream);
079    
080                    Thread currentThread = Thread.currentThread();
081    
082                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
083    
084                    try {
085                            ObjectInputStream bootstrapObjectInputStream =
086                                    new ObjectInputStream(System.in);
087    
088                            String processCallableName =
089                                    (String)bootstrapObjectInputStream.readObject();
090    
091                            String logPrefixString =
092                                    StringPool.OPEN_BRACKET.concat(processCallableName).concat(
093                                            StringPool.CLOSE_BRACKET);
094    
095                            byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
096    
097                            outProcessOutputStream.setLogPrefix(logPrefix);
098                            errProcessOutputStream.setLogPrefix(logPrefix);
099    
100                            String classPath = (String)bootstrapObjectInputStream.readObject();
101    
102                            ClassLoader classLoader = new URLClassLoader(
103                                    ClassPathUtil.getClassPathURLs(classPath));
104    
105                            currentThread.setContextClassLoader(classLoader);
106    
107                            ObjectInputStream objectInputStream =
108                                    new ClassLoaderObjectInputStream(
109                                            bootstrapObjectInputStream, classLoader);
110    
111                            ProcessCallable<?> processCallable =
112                                    (ProcessCallable<?>)objectInputStream.readObject();
113    
114                            Thread thread = new Thread(
115                                    new ProcessCallableRunner(objectInputStream),
116                                    "ProcessCallable-Runner");
117    
118                            thread.setDaemon(true);
119    
120                            thread.start();
121    
122                            Serializable result = processCallable.call();
123    
124                            System.out.flush();
125    
126                            outProcessOutputStream.writeProcessCallable(
127                                    new ReturnProcessCallable<Serializable>(result));
128    
129                            outProcessOutputStream.flush();
130                    }
131                    catch (Throwable t) {
132                            errPrintStream.flush();
133    
134                            ProcessException processException = null;
135    
136                            if (t instanceof ProcessException) {
137                                    processException = (ProcessException)t;
138                            }
139                            else {
140                                    processException = new ProcessException(t);
141                            }
142    
143                            errProcessOutputStream.writeProcessCallable(
144                                    new ExceptionProcessCallable(processException));
145    
146                            errProcessOutputStream.flush();
147                    }
148                    finally {
149                            currentThread.setContextClassLoader(contextClassLoader);
150                    }
151            }
152    
153            public static class ProcessContext {
154    
155                    public static boolean attach(
156                            String message, long interval, ShutdownHook shutdownHook) {
157    
158                            HeartbeatThread heartbeatThread = new HeartbeatThread(
159                                    message, interval, shutdownHook);
160    
161                            boolean value = _heartbeatThreadReference.compareAndSet(
162                                    null, heartbeatThread);
163    
164                            if (value) {
165                                    heartbeatThread.start();
166                            }
167    
168                            return value;
169                    }
170    
171                    public static void detach() throws InterruptedException {
172                            HeartbeatThread heartbeatThread =
173                                    _heartbeatThreadReference.getAndSet(null);
174    
175                            if (heartbeatThread != null) {
176                                    heartbeatThread.detach();
177                                    heartbeatThread.join();
178                            }
179                    }
180    
181                    public static ConcurrentMap<String, Object> getAttributes() {
182                            return _attributes;
183                    }
184    
185                    public static ProcessOutputStream getProcessOutputStream() {
186                            return _processOutputStream;
187                    }
188    
189                    public static boolean isAttached() {
190                            HeartbeatThread attachThread = _heartbeatThreadReference.get();
191    
192                            if (attachThread != null) {
193                                    return true;
194                            }
195                            else {
196                                    return false;
197                            }
198                    }
199    
200                    private static void _setProcessOutputStream(
201                            ProcessOutputStream processOutputStream) {
202    
203                            _processOutputStream = processOutputStream;
204                    }
205    
206                    private ProcessContext() {
207                    }
208    
209                    private static final ConcurrentMap<String, Object> _attributes =
210                            new ConcurrentHashMap<>();
211                    private static final AtomicReference<HeartbeatThread>
212                            _heartbeatThreadReference = new AtomicReference<>();
213                    private static ProcessOutputStream _processOutputStream;
214    
215            }
216    
217            public interface ShutdownHook {
218    
219                    public static final int BROKEN_PIPE_CODE = 1;
220    
221                    public static final int INTERRUPTION_CODE = 2;
222    
223                    public static final int UNKNOWN_CODE = 3;
224    
225                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
226    
227            }
228    
229            private static class HeartbeatThread extends Thread {
230    
231                    public HeartbeatThread(
232                            String message, long interval, ShutdownHook shutdownHook) {
233    
234                            if (shutdownHook == null) {
235                                    throw new IllegalArgumentException("Shutdown hook is null");
236                            }
237    
238                            _interval = interval;
239                            _shutdownHook = shutdownHook;
240    
241                            _pringBackProcessCallable = new PingbackProcessCallable(message);
242    
243                            setDaemon(true);
244                            setName(HeartbeatThread.class.getSimpleName());
245                    }
246    
247                    public void detach() {
248                            _detach = true;
249    
250                            interrupt();
251                    }
252    
253                    @Override
254                    public void run() {
255                            ProcessOutputStream processOutputStream =
256                                    ProcessContext.getProcessOutputStream();
257    
258                            int shutdownCode = 0;
259                            Throwable shutdownThrowable = null;
260    
261                            while (!_detach) {
262                                    try {
263                                            sleep(_interval);
264    
265                                            processOutputStream.writeProcessCallable(
266                                                    _pringBackProcessCallable);
267                                    }
268                                    catch (InterruptedException ie) {
269                                            if (_detach) {
270                                                    return;
271                                            }
272                                            else {
273                                                    shutdownThrowable = ie;
274    
275                                                    shutdownCode = ShutdownHook.INTERRUPTION_CODE;
276                                            }
277                                    }
278                                    catch (IOException ioe) {
279                                            shutdownThrowable = ioe;
280    
281                                            shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
282                                    }
283                                    catch (Throwable throwable) {
284                                            shutdownThrowable = throwable;
285    
286                                            shutdownCode = ShutdownHook.UNKNOWN_CODE;
287                                    }
288    
289                                    if (shutdownCode != 0) {
290                                            _detach = _shutdownHook.shutdown(
291                                                    shutdownCode, shutdownThrowable);
292                                    }
293                            }
294                    }
295    
296                    private volatile boolean _detach;
297                    private final long _interval;
298                    private final ProcessCallable<String> _pringBackProcessCallable;
299                    private final ShutdownHook _shutdownHook;
300    
301            }
302    
303            private static class PingbackProcessCallable
304                    implements ProcessCallable<String> {
305    
306                    public PingbackProcessCallable(String message) {
307                            _message = message;
308                    }
309    
310                    @Override
311                    public String call() {
312                            return _message;
313                    }
314    
315                    private static final long serialVersionUID = 1L;
316    
317                    private final String _message;
318    
319            }
320    
321            private static class ProcessCallableRunner implements Runnable {
322    
323                    public ProcessCallableRunner(ObjectInputStream objectInputStream) {
324                            _objectInputStream = objectInputStream;
325                    }
326    
327                    @Override
328                    public void run() {
329                            while (true) {
330                                    try {
331                                            ProcessCallable<?> processCallable =
332                                                    (ProcessCallable<?>)_objectInputStream.readObject();
333    
334                                            processCallable.call();
335                                    }
336                                    catch (Exception e) {
337                                            UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
338                                                    new UnsyncByteArrayOutputStream();
339    
340                                            UnsyncPrintWriter unsyncPrintWriter = new UnsyncPrintWriter(
341                                                    unsyncByteArrayOutputStream);
342    
343                                            unsyncPrintWriter.println(e);
344    
345                                            e.printStackTrace(unsyncPrintWriter);
346    
347                                            unsyncPrintWriter.println();
348    
349                                            unsyncPrintWriter.flush();
350    
351                                            System.err.write(
352                                                    unsyncByteArrayOutputStream.unsafeGetByteArray(), 0,
353                                                    unsyncByteArrayOutputStream.size());
354                                            System.err.flush();
355                                    }
356                            }
357                    }
358    
359                    private final ObjectInputStream _objectInputStream;
360    
361            }
362    
363    }