001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
019    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
020    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024    import com.liferay.portal.kernel.util.NamedThreadFactory;
025    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
026    import com.liferay.portal.kernel.util.StreamUtil;
027    import com.liferay.portal.kernel.util.StringPool;
028    
029    import java.io.EOFException;
030    import java.io.File;
031    import java.io.FileDescriptor;
032    import java.io.FileOutputStream;
033    import java.io.IOException;
034    import java.io.ObjectInputStream;
035    import java.io.ObjectOutputStream;
036    import java.io.PrintStream;
037    import java.io.Serializable;
038    import java.io.StreamCorruptedException;
039    
040    import java.util.ArrayList;
041    import java.util.Collections;
042    import java.util.Iterator;
043    import java.util.List;
044    import java.util.Set;
045    import java.util.concurrent.Callable;
046    import java.util.concurrent.ConcurrentHashMap;
047    import java.util.concurrent.ConcurrentMap;
048    import java.util.concurrent.ExecutionException;
049    import java.util.concurrent.ExecutorService;
050    import java.util.concurrent.Executors;
051    import java.util.concurrent.Future;
052    import java.util.concurrent.RejectedExecutionException;
053    import java.util.concurrent.TimeUnit;
054    import java.util.concurrent.TimeoutException;
055    import java.util.concurrent.atomic.AtomicReference;
056    
057    /**
058     * @author Shuyang Zhou
059     */
060    public class ProcessExecutor {
061    
062            public static <T extends Serializable> Future<T> execute(
063                            String classPath, List<String> arguments,
064                            ProcessCallable<? extends Serializable> processCallable)
065                    throws ProcessException {
066    
067                    return execute("java", classPath, arguments, processCallable);
068            }
069    
070            public static <T extends Serializable> Future<T> execute(
071                            String classPath,
072                            ProcessCallable<? extends Serializable> processCallable)
073                    throws ProcessException {
074    
075                    return execute(
076                            "java", classPath, Collections.<String>emptyList(),
077                            processCallable);
078            }
079    
080            public static <T extends Serializable> Future<T> execute(
081                            String java, String classPath, List<String> arguments,
082                            ProcessCallable<? extends Serializable> processCallable)
083                    throws ProcessException {
084    
085                    try {
086                            List<String> commands = new ArrayList<String>(arguments.size() + 4);
087    
088                            commands.add(java);
089                            commands.add("-cp");
090                            commands.add(classPath);
091                            commands.addAll(arguments);
092                            commands.add(ProcessExecutor.class.getName());
093    
094                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
095    
096                            Process process = processBuilder.start();
097    
098                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
099                                    process.getOutputStream());
100    
101                            try {
102                                    objectOutputStream.writeObject(processCallable);
103                            }
104                            finally {
105                                    objectOutputStream.close();
106                            }
107    
108                            ExecutorService executorService = _getExecutorService();
109    
110                            SubprocessReactor subprocessReactor = new SubprocessReactor(
111                                    process);
112    
113                            try {
114                                    Future<ProcessCallable<? extends Serializable>>
115                                            futureResponseProcessCallable = executorService.submit(
116                                                    subprocessReactor);
117    
118                                    // Consider the newly created process as a managed process only
119                                    // after the subprocess reactor is taken by the thread pool
120    
121                                    _managedProcesses.add(process);
122    
123                                    return new ProcessExecutionFutureResult<T>(
124                                            futureResponseProcessCallable, process);
125                            }
126                            catch (RejectedExecutionException ree) {
127                                    process.destroy();
128    
129                                    throw new ProcessException(
130                                            "Cancelled execution because of a concurrent destroy", ree);
131                            }
132                    }
133                    catch (IOException ioe) {
134                            throw new ProcessException(ioe);
135                    }
136            }
137    
138            public static void main(String[] arguments)
139                    throws ClassNotFoundException, IOException {
140    
141                    PrintStream oldOutPrintStream = System.out;
142    
143                    ObjectOutputStream objectOutputStream = null;
144                    ProcessOutputStream outProcessOutputStream = null;
145    
146                    synchronized (oldOutPrintStream) {
147                            oldOutPrintStream.flush();
148    
149                            FileOutputStream fileOutputStream = new FileOutputStream(
150                                    FileDescriptor.out);
151    
152                            objectOutputStream = new ObjectOutputStream(
153                                    new UnsyncBufferedOutputStream(fileOutputStream));
154    
155                            outProcessOutputStream = new ProcessOutputStream(
156                                    objectOutputStream, false);
157    
158                            ProcessContext._setProcessOutputStream(outProcessOutputStream);
159    
160                            PrintStream newOutPrintStream = new PrintStream(
161                                    outProcessOutputStream, true);
162    
163                            System.setOut(newOutPrintStream);
164                    }
165    
166                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
167                            objectOutputStream, true);
168    
169                    PrintStream errPrintStream = new PrintStream(
170                            errProcessOutputStream, true);
171    
172                    System.setErr(errPrintStream);
173    
174                    try {
175                            ObjectInputStream objectInputStream = new ObjectInputStream(
176                                    System.in);
177    
178                            ProcessCallable<?> processCallable =
179                                    (ProcessCallable<?>)objectInputStream.readObject();
180    
181                            String logPrefixString =
182                                    StringPool.OPEN_BRACKET.concat(
183                                            processCallable.toString()).concat(
184                                                    StringPool.CLOSE_BRACKET);
185    
186                            byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
187    
188                            outProcessOutputStream.setLogPrefix(logPrefix);
189                            errProcessOutputStream.setLogPrefix(logPrefix);
190    
191                            Serializable result = processCallable.call();
192    
193                            System.out.flush();
194    
195                            outProcessOutputStream.writeProcessCallable(
196                                    new ReturnProcessCallable<Serializable>(result));
197    
198                            outProcessOutputStream.flush();
199                    }
200                    catch (ProcessException pe) {
201                            errPrintStream.flush();
202    
203                            errProcessOutputStream.writeProcessCallable(
204                                    new ExceptionProcessCallable(pe));
205    
206                            errProcessOutputStream.flush();
207                    }
208            }
209    
210            public void destroy() {
211                    if (_executorService == null) {
212                            return;
213                    }
214    
215                    synchronized (ProcessExecutor.class) {
216                            if (_executorService != null) {
217                                    _executorService.shutdownNow();
218    
219                                    // At this point, the thread pool will no longer take in any
220                                    // more subprocess reactors, so we know the list of managed
221                                    // processes is in a safe state. The worst case is that the
222                                    // destroyer thread and the thread pool thread concurrently
223                                    // destroy the same process, but this is JDK's job to ensure
224                                    // that processes are destroyed in a thread safe manner.
225    
226                                    Iterator<Process> iterator = _managedProcesses.iterator();
227    
228                                    while (iterator.hasNext()) {
229                                            Process process = iterator.next();
230    
231                                            process.destroy();
232    
233                                            iterator.remove();
234                                    }
235    
236                                    // The current thread has a more comprehensive view of the list
237                                    // of managed processes than any thread pool thread. After the
238                                    // previous iteration, we are safe to clear the list of managed
239                                    // processes.
240    
241                                    _managedProcesses.clear();
242    
243                                    _executorService = null;
244                            }
245                    }
246            }
247    
248            public static class ProcessContext {
249    
250                    public static boolean attach(
251                            String message, long interval, ShutdownHook shutdownHook) {
252    
253                            HeartbeatThread heartbeatThread = new HeartbeatThread(
254                                    message, interval, shutdownHook);
255    
256                            boolean value = _heartbeatThreadReference.compareAndSet(
257                                    null, heartbeatThread);
258    
259                            if (value) {
260                                    heartbeatThread.start();
261                            }
262    
263                            return value;
264                    }
265    
266                    public static void detach() throws InterruptedException {
267                            HeartbeatThread heartbeatThread =
268                                    _heartbeatThreadReference.getAndSet(null);
269    
270                            if (heartbeatThread != null) {
271                                    heartbeatThread.detach();
272                                    heartbeatThread.join();
273                            }
274                    }
275    
276                    public static ConcurrentMap<String, Object> getAttributes() {
277                            return _attributes;
278                    }
279    
280                    public static ProcessOutputStream getProcessOutputStream() {
281                            return _processOutputStream;
282                    }
283    
284                    public static boolean isAttached() {
285                            HeartbeatThread attachThread = _heartbeatThreadReference.get();
286    
287                            if (attachThread != null) {
288                                    return true;
289                            }
290                            else {
291                                    return false;
292                            }
293                    }
294    
295                    private static void _setProcessOutputStream(
296                            ProcessOutputStream processOutputStream) {
297    
298                            _processOutputStream = processOutputStream;
299                    }
300    
301                    private ProcessContext() {
302                    }
303    
304                    private static ConcurrentMap<String, Object> _attributes =
305                            new ConcurrentHashMap<String, Object>();
306                    private static AtomicReference<HeartbeatThread>
307                            _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
308                    private static ProcessOutputStream _processOutputStream;
309    
310            }
311    
312            public static interface ShutdownHook {
313    
314                    public static final int BROKEN_PIPE_CODE = 1;
315    
316                    public static final int INTERRUPTION_CODE = 2;
317    
318                    public static final int UNKNOWN_CODE = 3;
319    
320                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
321    
322            }
323    
324            private static ExecutorService _getExecutorService() {
325                    if (_executorService != null) {
326                            return _executorService;
327                    }
328    
329                    synchronized (ProcessExecutor.class) {
330                            if (_executorService == null) {
331                                    _executorService = Executors.newCachedThreadPool(
332                                            new NamedThreadFactory(
333                                                    ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
334                                                    PortalClassLoaderUtil.getClassLoader()));
335                            }
336                    }
337    
338                    return _executorService;
339            }
340    
341            private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
342    
343            private static volatile ExecutorService _executorService;
344            private static Set<Process> _managedProcesses =
345                    new ConcurrentHashSet<Process>();
346    
347            private static class HeartbeatThread extends Thread {
348    
349                    public HeartbeatThread(
350                            String message, long interval, ShutdownHook shutdownHook) {
351    
352                            if (shutdownHook == null) {
353                                    throw new IllegalArgumentException("Shutdown hook is null");
354                            }
355    
356                            _interval = interval;
357                            _shutdownHook = shutdownHook;
358    
359                            _pringBackProcessCallable = new PingbackProcessCallable(message);
360    
361                            setDaemon(true);
362                            setName(HeartbeatThread.class.getSimpleName());
363                    }
364    
365                    public void detach() {
366                            _detach = true;
367    
368                            interrupt();
369                    }
370    
371                    @Override
372                    public void run() {
373                            ProcessOutputStream processOutputStream =
374                                    ProcessContext.getProcessOutputStream();
375    
376                            int shutdownCode = 0;
377                            Throwable shutdownThrowable = null;
378    
379                            while (!_detach) {
380                                    try {
381                                            sleep(_interval);
382    
383                                            processOutputStream.writeProcessCallable(
384                                                    _pringBackProcessCallable);
385                                    }
386                                    catch (InterruptedException ie) {
387                                            if (_detach) {
388                                                    return;
389                                            }
390                                            else {
391                                                    shutdownThrowable = ie;
392    
393                                                    shutdownCode = ShutdownHook.INTERRUPTION_CODE;
394                                            }
395                                    }
396                                    catch (IOException ioe) {
397                                            shutdownThrowable = ioe;
398    
399                                            shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
400                                    }
401                                    catch (Throwable throwable) {
402                                            shutdownThrowable = throwable;
403    
404                                            shutdownCode = ShutdownHook.UNKNOWN_CODE;
405                                    }
406    
407                                    if (shutdownCode != 0) {
408                                            _detach = _shutdownHook.shutdown(
409                                                    shutdownCode, shutdownThrowable);
410                                    }
411                            }
412                    }
413    
414                    private volatile boolean _detach;
415                    private final long _interval;
416                    private final ProcessCallable<String> _pringBackProcessCallable;
417                    private final ShutdownHook _shutdownHook;
418    
419            }
420    
421            private static class PingbackProcessCallable
422                    implements ProcessCallable<String> {
423    
424                    public PingbackProcessCallable(String message) {
425                            _message = message;
426                    }
427    
428                    public String call() {
429                            return _message;
430                    }
431    
432                    private final String _message;
433    
434            }
435    
436            private static class ProcessExecutionFutureResult<T> implements Future<T> {
437    
438                    public ProcessExecutionFutureResult(
439                            Future<ProcessCallable<? extends Serializable>> future,
440                            Process process) {
441    
442                            _future = future;
443                            _process = process;
444                    }
445    
446                    public boolean cancel(boolean mayInterruptIfRunning) {
447                            if (_future.isCancelled() || _future.isDone()) {
448                                    return false;
449                            }
450    
451                            _future.cancel(true);
452                            _process.destroy();
453    
454                            return true;
455                    }
456    
457                    public boolean isCancelled() {
458                            return _future.isCancelled();
459                    }
460    
461                    public boolean isDone() {
462                            return _future.isDone();
463                    }
464    
465                    public T get() throws ExecutionException, InterruptedException {
466                            ProcessCallable<?> processCallable = _future.get();
467    
468                            return get(processCallable);
469                    }
470    
471                    public T get(long timeout, TimeUnit timeUnit)
472                            throws ExecutionException, InterruptedException, TimeoutException {
473    
474                            ProcessCallable<?> processCallable = _future.get(timeout, timeUnit);
475    
476                            return get(processCallable);
477                    }
478    
479                    private T get(ProcessCallable<?> processCallable)
480                            throws ExecutionException {
481    
482                            try {
483                                    if (processCallable instanceof ReturnProcessCallable<?>) {
484                                            return (T)processCallable.call();
485                                    }
486                                    else {
487                                            ExceptionProcessCallable exceptionProcessCallable =
488                                                    (ExceptionProcessCallable)processCallable;
489    
490                                            throw exceptionProcessCallable.call();
491                                    }
492                            }
493                            catch (ProcessException pe) {
494                                    throw new ExecutionException(pe);
495                            }
496                    }
497    
498                    private final Future<ProcessCallable<?>> _future;
499                    private final Process _process;
500    
501            }
502    
503            private static class SubprocessReactor
504                    implements Callable<ProcessCallable<? extends Serializable>> {
505    
506                    public SubprocessReactor(Process process) {
507                            _process = process;
508                    }
509    
510                    public ProcessCallable<? extends Serializable> call() throws Exception {
511                            ProcessCallable<?> resultProcessCallable = null;
512    
513                            UnsyncBufferedInputStream unsyncBufferedInputStream =
514                                    new UnsyncBufferedInputStream(_process.getInputStream());
515    
516                            try {
517                                    ObjectInputStream objectInputStream = null;
518    
519                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
520                                            new UnsyncByteArrayOutputStream();
521    
522                                    while (true) {
523                                            try {
524    
525                                                    // Be ready for a bad header
526    
527                                                    unsyncBufferedInputStream.mark(4);
528    
529                                                    objectInputStream =
530                                                            new PortalClassLoaderObjectInputStream(
531                                                                    unsyncBufferedInputStream);
532    
533                                                    // Found the beginning of the object input stream. Flush
534                                                    // out corrupted log if necessary.
535    
536                                                    if (unsyncByteArrayOutputStream.size() > 0) {
537                                                            if (_log.isWarnEnabled()) {
538                                                                    _log.warn(
539                                                                            "Found corrupt leading log " +
540                                                                                    unsyncByteArrayOutputStream.toString());
541                                                            }
542                                                    }
543    
544                                                    unsyncByteArrayOutputStream = null;
545    
546                                                    break;
547                                            }
548                                            catch (StreamCorruptedException sce) {
549    
550                                                    // Collecting bad header as log information
551    
552                                                    unsyncBufferedInputStream.reset();
553    
554                                                    unsyncByteArrayOutputStream.write(
555                                                            unsyncBufferedInputStream.read());
556                                            }
557                                    }
558    
559                                    while (true) {
560                                            ProcessCallable<?> processCallable =
561                                                    (ProcessCallable<?>)objectInputStream.readObject();
562    
563                                            if ((processCallable instanceof ExceptionProcessCallable) ||
564                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
565    
566                                                    resultProcessCallable = processCallable;
567    
568                                                    continue;
569                                            }
570    
571                                            Serializable returnValue = processCallable.call();
572    
573                                            if (_log.isDebugEnabled()) {
574                                                    _log.debug(
575                                                            "Invoked generic process callable " +
576                                                                    processCallable + " with return value " +
577                                                                            returnValue);
578                                            }
579                                    }
580                            }
581                            catch (StreamCorruptedException sce) {
582                                    File file = File.createTempFile(
583                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
584                                            ".log");
585    
586                                    _log.error(
587                                            "Dumping content of corrupted object input stream to " +
588                                                    file.getAbsolutePath(),
589                                            sce);
590    
591                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
592    
593                                    StreamUtil.transfer(
594                                            unsyncBufferedInputStream, fileOutputStream);
595    
596                                    throw new ProcessException(
597                                            "Corrupted object input stream", sce);
598                            }
599                            catch (EOFException eofe) {
600                                    throw new ProcessException(
601                                            "Subprocess piping back ended prematurely", eofe);
602                            }
603                            finally {
604                                    try {
605                                            int exitCode = _process.waitFor();
606    
607                                            if (exitCode != 0) {
608                                                    throw new ProcessException(
609                                                            "Subprocess terminated with exit code " + exitCode);
610                                            }
611                                    }
612                                    catch (InterruptedException ie) {
613                                            _process.destroy();
614    
615                                            throw new ProcessException(
616                                                    "Forcibly killed subprocess on interruption", ie);
617                                    }
618    
619                                    _managedProcesses.remove(_process);
620    
621                                    if (resultProcessCallable != null) {
622    
623                                            // Override previous process exception if there was one
624    
625                                            return resultProcessCallable;
626                                    }
627                            }
628                    }
629    
630                    private final Process _process;
631    
632            }
633    
634    }