001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.util.NamedThreadFactory;
018    import com.liferay.portal.kernel.util.ObjectValuePair;
019    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
020    
021    import java.io.IOException;
022    
023    import java.util.Arrays;
024    import java.util.List;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.Future;
030    import java.util.concurrent.RejectedExecutionException;
031    import java.util.concurrent.TimeUnit;
032    import java.util.concurrent.TimeoutException;
033    
034    /**
035     * @author Shuyang Zhou
036     */
037    public class ProcessUtil {
038    
039            public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
040                    new ConsumerOutputProcessor();
041    
042            public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
043                    new LoggingOutputProcessor();
044    
045            public static <O, E> Future<ObjectValuePair<O, E>> execute(
046                            OutputProcessor<O, E> outputProcessor, List<String> arguments)
047                    throws ProcessException {
048    
049                    if (outputProcessor == null) {
050                            throw new NullPointerException("Output processor is null");
051                    }
052    
053                    if (arguments == null) {
054                            throw new NullPointerException("Arguments is null");
055                    }
056    
057                    ProcessBuilder processBuilder = new ProcessBuilder(arguments);
058    
059                    try {
060                            Process process = processBuilder.start();
061    
062                            ExecutorService executorService = _getExecutorService();
063    
064                            try {
065                                    Future<O> stdOutFuture = executorService.submit(
066                                            new ProcessStdOutCallable<O>(outputProcessor, process));
067    
068                                    Future<E> stdErrFuture = executorService.submit(
069                                            new ProcessStdErrCallable<E>(outputProcessor, process));
070    
071                                    return new BindedFuture<O, E>(
072                                            stdOutFuture, stdErrFuture, process);
073                            }
074                            catch (RejectedExecutionException ree) {
075                                    process.destroy();
076    
077                                    throw new ProcessException(
078                                            "Cancelled execution because of a concurrent destroy", ree);
079                            }
080                    }
081                    catch (IOException e) {
082                            throw new ProcessException(e);
083                    }
084            }
085    
086            public static <O, E> Future<ObjectValuePair<O, E>> execute(
087                            OutputProcessor<O, E> outputProcessor, String... arguments)
088                    throws ProcessException {
089    
090                    return execute(outputProcessor, Arrays.asList(arguments));
091            }
092    
093            public void destroy() {
094                    if (_executorService == null) {
095                            return;
096                    }
097    
098                    synchronized (ProcessUtil.class) {
099                            if (_executorService != null) {
100                                    _executorService.shutdownNow();
101    
102                                    _executorService = null;
103                            }
104                    }
105            }
106    
107            private static ExecutorService _getExecutorService() {
108                    if (_executorService != null) {
109                            return _executorService;
110                    }
111    
112                    synchronized (ProcessUtil.class) {
113                            if (_executorService == null) {
114                                    _executorService = Executors.newCachedThreadPool(
115                                            new NamedThreadFactory(
116                                                    ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
117                                                    PortalClassLoaderUtil.getClassLoader()));
118                            }
119                    }
120    
121                    return _executorService;
122            }
123    
124            private static volatile ExecutorService _executorService;
125    
126            private static class BindedFuture<O, E>
127                    implements Future<ObjectValuePair<O, E>> {
128    
129                    public BindedFuture(
130                            Future<O> stdOutFuture, Future<E> stdErrFuture, Process process) {
131    
132                            _stdOutFuture = stdOutFuture;
133                            _stdErrFuture = stdErrFuture;
134                            _process = process;
135                    }
136    
137                    public boolean cancel(boolean mayInterruptIfRunning) {
138                            if (_stdOutFuture.isCancelled() || _stdOutFuture.isDone()) {
139                                    return false;
140                            }
141    
142                            _stdErrFuture.cancel(true);
143                            _stdOutFuture.cancel(true);
144                            _process.destroy();
145    
146                            return true;
147                    }
148    
149                    public ObjectValuePair<O, E> get()
150                            throws ExecutionException, InterruptedException {
151    
152                            E stdErrResult = _stdErrFuture.get();
153                            O stdOutResult = _stdOutFuture.get();
154    
155                            return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
156                    }
157    
158                    public ObjectValuePair<O, E> get(long timeout, TimeUnit unit)
159                            throws ExecutionException, InterruptedException, TimeoutException {
160    
161                            long startTime = System.currentTimeMillis();
162    
163                            E stdErrResult = _stdErrFuture.get(timeout, unit);
164    
165                            long elapseTime = System.currentTimeMillis() - startTime;
166    
167                            long secondTimeout =
168                                    timeout - unit.convert(elapseTime, TimeUnit.MILLISECONDS);
169    
170                            O stdOutResult = _stdOutFuture.get(secondTimeout, unit);
171    
172                            return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
173                    }
174    
175                    public boolean isCancelled() {
176                            return _stdOutFuture.isCancelled();
177                    }
178    
179                    public boolean isDone() {
180                            return _stdOutFuture.isDone();
181                    }
182    
183                    private final Future<E> _stdErrFuture;
184                    private final Future<O> _stdOutFuture;
185                    private final Process _process;
186    
187            }
188    
189            private static class ProcessStdErrCallable<T> implements Callable<T> {
190    
191                    public ProcessStdErrCallable(
192                            OutputProcessor<?, T> outputProcessor, Process process) {
193    
194                            _outputProcessor = outputProcessor;
195                            _process = process;
196                    }
197    
198                    public T call() throws Exception {
199                            return _outputProcessor.processStdErr(_process.getErrorStream());
200                    }
201    
202                    private final OutputProcessor<?, T> _outputProcessor;
203                    private final Process _process;
204    
205            }
206    
207            private static class ProcessStdOutCallable<T> implements Callable<T> {
208    
209                    public ProcessStdOutCallable(
210                            OutputProcessor<T, ?> outputProcessor, Process process) {
211    
212                            _outputProcessor = outputProcessor;
213                            _process = process;
214                    }
215    
216                    public T call() throws Exception {
217                            try {
218                                    return _outputProcessor.processStdOut(
219                                            _process.getInputStream());
220                            }
221                            finally {
222                                    try {
223                                            int exitCode = _process.waitFor();
224    
225                                            if (exitCode != 0) {
226                                                    throw new ProcessException(
227                                                            "Subprocess terminated with exit code " + exitCode);
228                                            }
229                                    }
230                                    catch (InterruptedException ie) {
231                                            _process.destroy();
232    
233                                            throw new ProcessException(
234                                                    "Forcibly killed subprocess on interruption", ie);
235                                    }
236                            }
237                    }
238    
239                    private final OutputProcessor<T, ?> _outputProcessor;
240                    private final Process _process;
241    
242            }
243    
244    }