001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
019    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
020    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025    import com.liferay.portal.kernel.util.NamedThreadFactory;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    import com.liferay.portal.kernel.util.StreamUtil;
028    import com.liferay.portal.kernel.util.StringPool;
029    
030    import java.io.EOFException;
031    import java.io.File;
032    import java.io.FileDescriptor;
033    import java.io.FileOutputStream;
034    import java.io.IOException;
035    import java.io.ObjectInputStream;
036    import java.io.ObjectOutputStream;
037    import java.io.PrintStream;
038    import java.io.Serializable;
039    import java.io.StreamCorruptedException;
040    
041    import java.util.ArrayList;
042    import java.util.Collections;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Set;
046    import java.util.concurrent.Callable;
047    import java.util.concurrent.ConcurrentHashMap;
048    import java.util.concurrent.ConcurrentMap;
049    import java.util.concurrent.ExecutionException;
050    import java.util.concurrent.ExecutorService;
051    import java.util.concurrent.Executors;
052    import java.util.concurrent.Future;
053    import java.util.concurrent.RejectedExecutionException;
054    import java.util.concurrent.TimeUnit;
055    import java.util.concurrent.TimeoutException;
056    import java.util.concurrent.atomic.AtomicReference;
057    
058    /**
059     * @author Shuyang Zhou
060     */
061    public class ProcessExecutor {
062    
063            public static <T extends Serializable> Future<T> execute(
064                            String classPath, List<String> arguments,
065                            ProcessCallable<? extends Serializable> processCallable)
066                    throws ProcessException {
067    
068                    return execute("java", classPath, arguments, processCallable);
069            }
070    
071            public static <T extends Serializable> Future<T> execute(
072                            String classPath,
073                            ProcessCallable<? extends Serializable> processCallable)
074                    throws ProcessException {
075    
076                    return execute(
077                            "java", classPath, Collections.<String>emptyList(),
078                            processCallable);
079            }
080    
081            public static <T extends Serializable> Future<T> execute(
082                            String java, String classPath, List<String> arguments,
083                            ProcessCallable<? extends Serializable> processCallable)
084                    throws ProcessException {
085    
086                    try {
087                            List<String> commands = new ArrayList<String>(arguments.size() + 4);
088    
089                            commands.add(java);
090                            commands.add("-cp");
091                            commands.add(classPath);
092                            commands.addAll(arguments);
093                            commands.add(ProcessExecutor.class.getName());
094    
095                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
096    
097                            Process process = processBuilder.start();
098    
099                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
100                                    process.getOutputStream());
101    
102                            try {
103                                    objectOutputStream.writeObject(processCallable);
104                            }
105                            finally {
106                                    objectOutputStream.close();
107                            }
108    
109                            ExecutorService executorService = _getExecutorService();
110    
111                            SubprocessReactor subprocessReactor = new SubprocessReactor(
112                                    process);
113    
114                            try {
115                                    Future<ProcessCallable<? extends Serializable>>
116                                            futureResponseProcessCallable = executorService.submit(
117                                                    subprocessReactor);
118    
119                                    // Consider the newly created process as a managed process only
120                                    // after the subprocess reactor is taken by the thread pool
121    
122                                    _managedProcesses.add(process);
123    
124                                    return new ProcessExecutionFutureResult<T>(
125                                            futureResponseProcessCallable, process);
126                            }
127                            catch (RejectedExecutionException ree) {
128                                    process.destroy();
129    
130                                    throw new ProcessException(
131                                            "Cancelled execution because of a concurrent destroy", ree);
132                            }
133                    }
134                    catch (IOException ioe) {
135                            throw new ProcessException(ioe);
136                    }
137            }
138    
139            public static void main(String[] arguments)
140                    throws ClassNotFoundException, IOException {
141    
142                    PrintStream oldOutPrintStream = System.out;
143    
144                    ObjectOutputStream objectOutputStream = null;
145                    ProcessOutputStream outProcessOutputStream = null;
146    
147                    synchronized (oldOutPrintStream) {
148                            oldOutPrintStream.flush();
149    
150                            FileOutputStream fileOutputStream = new FileOutputStream(
151                                    FileDescriptor.out);
152    
153                            objectOutputStream = new ObjectOutputStream(
154                                    new UnsyncBufferedOutputStream(fileOutputStream));
155    
156                            outProcessOutputStream = new ProcessOutputStream(
157                                    objectOutputStream, false);
158    
159                            ProcessContext._setProcessOutputStream(outProcessOutputStream);
160    
161                            PrintStream newOutPrintStream = new PrintStream(
162                                    outProcessOutputStream, true);
163    
164                            System.setOut(newOutPrintStream);
165                    }
166    
167                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
168                            objectOutputStream, true);
169    
170                    PrintStream errPrintStream = new PrintStream(
171                            errProcessOutputStream, true);
172    
173                    System.setErr(errPrintStream);
174    
175                    try {
176                            ObjectInputStream objectInputStream = new ObjectInputStream(
177                                    System.in);
178    
179                            ProcessCallable<?> processCallable =
180                                    (ProcessCallable<?>)objectInputStream.readObject();
181    
182                            String logPrefixString =
183                                    StringPool.OPEN_BRACKET.concat(
184                                            processCallable.toString()).concat(
185                                                    StringPool.CLOSE_BRACKET);
186    
187                            byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
188    
189                            outProcessOutputStream.setLogPrefix(logPrefix);
190                            errProcessOutputStream.setLogPrefix(logPrefix);
191    
192                            Serializable result = processCallable.call();
193    
194                            System.out.flush();
195    
196                            outProcessOutputStream.writeProcessCallable(
197                                    new ReturnProcessCallable<Serializable>(result));
198    
199                            outProcessOutputStream.flush();
200                    }
201                    catch (ProcessException pe) {
202                            errPrintStream.flush();
203    
204                            errProcessOutputStream.writeProcessCallable(
205                                    new ExceptionProcessCallable(pe));
206    
207                            errProcessOutputStream.flush();
208                    }
209            }
210    
211            public void destroy() {
212                    if (_executorService == null) {
213                            return;
214                    }
215    
216                    synchronized (ProcessExecutor.class) {
217                            if (_executorService != null) {
218                                    _executorService.shutdownNow();
219    
220                                    // At this point, the thread pool will no longer take in any
221                                    // more subprocess reactors, so we know the list of managed
222                                    // processes is in a safe state. The worst case is that the
223                                    // destroyer thread and the thread pool thread concurrently
224                                    // destroy the same process, but this is JDK's job to ensure
225                                    // that processes are destroyed in a thread safe manner.
226    
227                                    Iterator<Process> iterator = _managedProcesses.iterator();
228    
229                                    while (iterator.hasNext()) {
230                                            Process process = iterator.next();
231    
232                                            process.destroy();
233    
234                                            iterator.remove();
235                                    }
236    
237                                    // The current thread has a more comprehensive view of the list
238                                    // of managed processes than any thread pool thread. After the
239                                    // previous iteration, we are safe to clear the list of managed
240                                    // processes.
241    
242                                    _managedProcesses.clear();
243    
244                                    _executorService = null;
245                            }
246                    }
247            }
248    
249            public static class ProcessContext {
250    
251                    public static boolean attach(
252                            String message, long interval, ShutdownHook shutdownHook) {
253    
254                            HeartbeatThread heartbeatThread = new HeartbeatThread(
255                                    message, interval, shutdownHook);
256    
257                            boolean value = _heartbeatThreadReference.compareAndSet(
258                                    null, heartbeatThread);
259    
260                            if (value) {
261                                    heartbeatThread.start();
262                            }
263    
264                            return value;
265                    }
266    
267                    public static void detach() throws InterruptedException {
268                            HeartbeatThread heartbeatThread =
269                                    _heartbeatThreadReference.getAndSet(null);
270    
271                            if (heartbeatThread != null) {
272                                    heartbeatThread.detach();
273                                    heartbeatThread.join();
274                            }
275                    }
276    
277                    public static ConcurrentMap<String, Object> getAttributes() {
278                            return _attributes;
279                    }
280    
281                    public static ProcessOutputStream getProcessOutputStream() {
282                            return _processOutputStream;
283                    }
284    
285                    public static boolean isAttached() {
286                            HeartbeatThread attachThread = _heartbeatThreadReference.get();
287    
288                            if (attachThread != null) {
289                                    return true;
290                            }
291                            else {
292                                    return false;
293                            }
294                    }
295    
296                    private static void _setProcessOutputStream(
297                            ProcessOutputStream processOutputStream) {
298    
299                            _processOutputStream = processOutputStream;
300                    }
301    
302                    private ProcessContext() {
303                    }
304    
305                    private static ConcurrentMap<String, Object> _attributes =
306                            new ConcurrentHashMap<String, Object>();
307                    private static AtomicReference<HeartbeatThread>
308                            _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
309                    private static ProcessOutputStream _processOutputStream;
310    
311            }
312    
313            public static interface ShutdownHook {
314    
315                    public static final int BROKEN_PIPE_CODE = 1;
316    
317                    public static final int INTERRUPTION_CODE = 2;
318    
319                    public static final int UNKNOWN_CODE = 3;
320    
321                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
322    
323            }
324    
325            private static ExecutorService _getExecutorService() {
326                    if (_executorService != null) {
327                            return _executorService;
328                    }
329    
330                    synchronized (ProcessExecutor.class) {
331                            if (_executorService == null) {
332                                    _executorService = Executors.newCachedThreadPool(
333                                            new NamedThreadFactory(
334                                                    ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
335                                                    PortalClassLoaderUtil.getClassLoader()));
336                            }
337                    }
338    
339                    return _executorService;
340            }
341    
342            private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
343    
344            private static volatile ExecutorService _executorService;
345            private static Set<Process> _managedProcesses =
346                    new ConcurrentHashSet<Process>();
347    
348            private static class HeartbeatThread extends Thread {
349    
350                    public HeartbeatThread(
351                            String message, long interval, ShutdownHook shutdownHook) {
352    
353                            if (shutdownHook == null) {
354                                    throw new IllegalArgumentException("Shutdown hook is null");
355                            }
356    
357                            _interval = interval;
358                            _shutdownHook = shutdownHook;
359    
360                            _pringBackProcessCallable = new PingbackProcessCallable(message);
361    
362                            setDaemon(true);
363                            setName(HeartbeatThread.class.getSimpleName());
364                    }
365    
366                    public void detach() {
367                            _detach = true;
368    
369                            interrupt();
370                    }
371    
372                    @Override
373                    public void run() {
374                            ProcessOutputStream processOutputStream =
375                                    ProcessContext.getProcessOutputStream();
376    
377                            int shutdownCode = 0;
378                            Throwable shutdownThrowable = null;
379    
380                            while (!_detach) {
381                                    try {
382                                            sleep(_interval);
383    
384                                            processOutputStream.writeProcessCallable(
385                                                    _pringBackProcessCallable);
386                                    }
387                                    catch (InterruptedException ie) {
388                                            if (_detach) {
389                                                    return;
390                                            }
391                                            else {
392                                                    shutdownThrowable = ie;
393    
394                                                    shutdownCode = ShutdownHook.INTERRUPTION_CODE;
395                                            }
396                                    }
397                                    catch (IOException ioe) {
398                                            shutdownThrowable = ioe;
399    
400                                            shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
401                                    }
402                                    catch (Throwable throwable) {
403                                            shutdownThrowable = throwable;
404    
405                                            shutdownCode = ShutdownHook.UNKNOWN_CODE;
406                                    }
407    
408                                    if (shutdownCode != 0) {
409                                            _detach = _shutdownHook.shutdown(
410                                                    shutdownCode, shutdownThrowable);
411                                    }
412                            }
413                    }
414    
415                    private volatile boolean _detach;
416                    private final long _interval;
417                    private final ProcessCallable<String> _pringBackProcessCallable;
418                    private final ShutdownHook _shutdownHook;
419    
420            }
421    
422            private static class PingbackProcessCallable
423                    implements ProcessCallable<String> {
424    
425                    public PingbackProcessCallable(String message) {
426                            _message = message;
427                    }
428    
429                    public String call() {
430                            return _message;
431                    }
432    
433                    private static final long serialVersionUID = 1L;
434    
435                    private final String _message;
436    
437            }
438    
439            private static class ProcessExecutionFutureResult<T> implements Future<T> {
440    
441                    public ProcessExecutionFutureResult(
442                            Future<ProcessCallable<? extends Serializable>> future,
443                            Process process) {
444    
445                            _future = future;
446                            _process = process;
447                    }
448    
449                    public boolean cancel(boolean mayInterruptIfRunning) {
450                            if (_future.isCancelled() || _future.isDone()) {
451                                    return false;
452                            }
453    
454                            _future.cancel(true);
455                            _process.destroy();
456    
457                            return true;
458                    }
459    
460                    public boolean isCancelled() {
461                            return _future.isCancelled();
462                    }
463    
464                    public boolean isDone() {
465                            return _future.isDone();
466                    }
467    
468                    public T get() throws ExecutionException, InterruptedException {
469                            ProcessCallable<?> processCallable = _future.get();
470    
471                            return get(processCallable);
472                    }
473    
474                    public T get(long timeout, TimeUnit timeUnit)
475                            throws ExecutionException, InterruptedException, TimeoutException {
476    
477                            ProcessCallable<?> processCallable = _future.get(timeout, timeUnit);
478    
479                            return get(processCallable);
480                    }
481    
482                    private T get(ProcessCallable<?> processCallable)
483                            throws ExecutionException {
484    
485                            try {
486                                    if (processCallable instanceof ReturnProcessCallable<?>) {
487                                            return (T)processCallable.call();
488                                    }
489                                    else {
490                                            ExceptionProcessCallable exceptionProcessCallable =
491                                                    (ExceptionProcessCallable)processCallable;
492    
493                                            throw exceptionProcessCallable.call();
494                                    }
495                            }
496                            catch (ProcessException pe) {
497                                    throw new ExecutionException(pe);
498                            }
499                    }
500    
501                    private final Future<ProcessCallable<?>> _future;
502                    private final Process _process;
503    
504            }
505    
506            private static class SubprocessReactor
507                    implements Callable<ProcessCallable<? extends Serializable>> {
508    
509                    public SubprocessReactor(Process process) {
510                            _process = process;
511                    }
512    
513                    public ProcessCallable<? extends Serializable> call() throws Exception {
514                            ProcessCallable<?> resultProcessCallable = null;
515    
516                            UnsyncBufferedInputStream unsyncBufferedInputStream =
517                                    new UnsyncBufferedInputStream(_process.getInputStream());
518    
519                            try {
520                                    ObjectInputStream objectInputStream = null;
521    
522                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
523                                            new UnsyncByteArrayOutputStream();
524    
525                                    while (true) {
526                                            try {
527    
528                                                    // Be ready for a bad header
529    
530                                                    unsyncBufferedInputStream.mark(4);
531    
532                                                    objectInputStream = new ClassLoaderObjectInputStream(
533                                                            unsyncBufferedInputStream,
534                                                            PortalClassLoaderUtil.getClassLoader());
535    
536                                                    // Found the beginning of the object input stream. Flush
537                                                    // out corrupted log if necessary.
538    
539                                                    if (unsyncByteArrayOutputStream.size() > 0) {
540                                                            if (_log.isWarnEnabled()) {
541                                                                    _log.warn(
542                                                                            "Found corrupt leading log " +
543                                                                                    unsyncByteArrayOutputStream.toString());
544                                                            }
545                                                    }
546    
547                                                    unsyncByteArrayOutputStream = null;
548    
549                                                    break;
550                                            }
551                                            catch (StreamCorruptedException sce) {
552    
553                                                    // Collecting bad header as log information
554    
555                                                    unsyncBufferedInputStream.reset();
556    
557                                                    unsyncByteArrayOutputStream.write(
558                                                            unsyncBufferedInputStream.read());
559                                            }
560                                    }
561    
562                                    while (true) {
563                                            ProcessCallable<?> processCallable =
564                                                    (ProcessCallable<?>)objectInputStream.readObject();
565    
566                                            if ((processCallable instanceof ExceptionProcessCallable) ||
567                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
568    
569                                                    resultProcessCallable = processCallable;
570    
571                                                    continue;
572                                            }
573    
574                                            Serializable returnValue = processCallable.call();
575    
576                                            if (_log.isDebugEnabled()) {
577                                                    _log.debug(
578                                                            "Invoked generic process callable " +
579                                                                    processCallable + " with return value " +
580                                                                            returnValue);
581                                            }
582                                    }
583                            }
584                            catch (StreamCorruptedException sce) {
585                                    File file = File.createTempFile(
586                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
587                                            ".log");
588    
589                                    _log.error(
590                                            "Dumping content of corrupted object input stream to " +
591                                                    file.getAbsolutePath(),
592                                            sce);
593    
594                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
595    
596                                    StreamUtil.transfer(
597                                            unsyncBufferedInputStream, fileOutputStream);
598    
599                                    throw new ProcessException(
600                                            "Corrupted object input stream", sce);
601                            }
602                            catch (EOFException eofe) {
603                                    throw new ProcessException(
604                                            "Subprocess piping back ended prematurely", eofe);
605                            }
606                            finally {
607                                    try {
608                                            int exitCode = _process.waitFor();
609    
610                                            if (exitCode != 0) {
611                                                    throw new ProcessException(
612                                                            "Subprocess terminated with exit code " + exitCode);
613                                            }
614                                    }
615                                    catch (InterruptedException ie) {
616                                            _process.destroy();
617    
618                                            throw new ProcessException(
619                                                    "Forcibly killed subprocess on interruption", ie);
620                                    }
621    
622                                    _managedProcesses.remove(_process);
623    
624                                    if (resultProcessCallable != null) {
625    
626                                            // Override previous process exception if there was one
627    
628                                            return resultProcessCallable;
629                                    }
630                            }
631                    }
632    
633                    private final Process _process;
634    
635            }
636    
637    }