001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
019    import com.liferay.portal.kernel.concurrent.FutureListener;
020    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    import com.liferay.portal.kernel.util.ObjectValuePair;
025    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
026    
027    import java.io.IOException;
028    
029    import java.util.Arrays;
030    import java.util.List;
031    import java.util.concurrent.Callable;
032    import java.util.concurrent.ExecutionException;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicMarkableReference;
037    
038    /**
039     * @author Shuyang Zhou
040     */
041    public class ProcessUtil {
042    
043            public static final CollectorOutputProcessor COLLECTOR_OUTPUT_PROCESSOR =
044                    new CollectorOutputProcessor();
045    
046            public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
047                    new ConsumerOutputProcessor();
048    
049            public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
050                    new LoggingOutputProcessor();
051    
052            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
053                            OutputProcessor<O, E> outputProcessor, List<String> arguments)
054                    throws ProcessException {
055    
056                    if (outputProcessor == null) {
057                            throw new NullPointerException("Output processor is null");
058                    }
059    
060                    if (arguments == null) {
061                            throw new NullPointerException("Arguments is null");
062                    }
063    
064                    ProcessBuilder processBuilder = new ProcessBuilder(arguments);
065    
066                    try {
067                            Process process = processBuilder.start();
068    
069                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
070    
071                            try {
072                                    NoticeableFuture<O> stdOutNoticeableFuture =
073                                            threadPoolExecutor.submit(
074                                                    new ProcessStdOutCallable<O>(outputProcessor, process));
075    
076                                    NoticeableFuture<E> stdErrNoticeableFuture =
077                                            threadPoolExecutor.submit(
078                                                    new ProcessStdErrCallable<E>(outputProcessor, process));
079    
080                                    return _wrapNoticeableFuture(
081                                            stdOutNoticeableFuture, stdErrNoticeableFuture, process);
082                            }
083                            catch (RejectedExecutionException ree) {
084                                    process.destroy();
085    
086                                    throw new ProcessException(
087                                            "Cancelled execution because of a concurrent destroy", ree);
088                            }
089                    }
090                    catch (IOException ioe) {
091                            throw new ProcessException(ioe);
092                    }
093            }
094    
095            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
096                            OutputProcessor<O, E> outputProcessor, String... arguments)
097                    throws ProcessException {
098    
099                    return execute(outputProcessor, Arrays.asList(arguments));
100            }
101    
102            public void destroy() {
103                    if (_threadPoolExecutor == null) {
104                            return;
105                    }
106    
107                    synchronized (ProcessUtil.class) {
108                            if (_threadPoolExecutor != null) {
109                                    _threadPoolExecutor.shutdownNow();
110    
111                                    _threadPoolExecutor = null;
112                            }
113                    }
114            }
115    
116            private static ThreadPoolExecutor _getThreadPoolExecutor() {
117                    if (_threadPoolExecutor != null) {
118                            return _threadPoolExecutor;
119                    }
120    
121                    synchronized (ProcessUtil.class) {
122                            if (_threadPoolExecutor == null) {
123                                    _threadPoolExecutor = new ThreadPoolExecutor(
124                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
125                                            Integer.MAX_VALUE, new AbortPolicy(),
126                                            new NamedThreadFactory(
127                                                    ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
128                                                    PortalClassLoaderUtil.getClassLoader()),
129                                            new ThreadPoolHandlerAdapter());
130                            }
131                    }
132    
133                    return _threadPoolExecutor;
134            }
135    
136            private static <O, E> NoticeableFuture<ObjectValuePair<O, E>>
137                    _wrapNoticeableFuture(
138                            final NoticeableFuture<O> stdOutNoticeableFuture,
139                            final NoticeableFuture<E> stdErrNoticeableFuture,
140                            final Process process) {
141    
142                    final DefaultNoticeableFuture<ObjectValuePair<O, E>>
143                            defaultNoticeableFuture =
144                                    new DefaultNoticeableFuture<ObjectValuePair<O, E>>();
145    
146                    defaultNoticeableFuture.addFutureListener(
147                            new FutureListener<ObjectValuePair<O, E>>() {
148    
149                                    @Override
150                                    public void complete(Future<ObjectValuePair<O, E>> future) {
151                                            if (!future.isCancelled()) {
152                                                    return;
153                                            }
154    
155                                            stdOutNoticeableFuture.cancel(true);
156    
157                                            stdErrNoticeableFuture.cancel(true);
158    
159                                            process.destroy();
160                                    }
161    
162                            });
163    
164                    final AtomicMarkableReference<O> stdOutReference =
165                            new AtomicMarkableReference<O>(null, false);
166    
167                    final AtomicMarkableReference<E> stdErrReference =
168                            new AtomicMarkableReference<E>(null, false);
169    
170                    stdOutNoticeableFuture.addFutureListener(
171                            new FutureListener<O>() {
172    
173                                    @Override
174                                    public void complete(Future<O> future) {
175                                            try {
176                                                    O stdOut = future.get();
177    
178                                                    stdOutReference.set(stdOut, true);
179    
180                                                    boolean[] markHolder = new boolean[1];
181    
182                                                    E stdErr = stdErrReference.get(markHolder);
183    
184                                                    if (markHolder[0]) {
185                                                            defaultNoticeableFuture.set(
186                                                                    new ObjectValuePair<O, E>(stdOut, stdErr));
187                                                    }
188                                            }
189                                            catch (Throwable t) {
190                                                    if (t instanceof ExecutionException) {
191                                                            t = t.getCause();
192                                                    }
193    
194                                                    defaultNoticeableFuture.setException(t);
195                                            }
196                                    }
197    
198                            });
199    
200                    stdErrNoticeableFuture.addFutureListener(
201                            new FutureListener<E>() {
202    
203                                    @Override
204                                    public void complete(Future<E> future) {
205                                            try {
206                                                    E stdErr = future.get();
207    
208                                                    stdErrReference.set(stdErr, true);
209    
210                                                    boolean[] markHolder = new boolean[1];
211    
212                                                    O stdOut = stdOutReference.get(markHolder);
213    
214                                                    if (markHolder[0]) {
215                                                            defaultNoticeableFuture.set(
216                                                                    new ObjectValuePair<O, E>(stdOut, stdErr));
217                                                    }
218                                            }
219                                            catch (Throwable t) {
220                                                    if (t instanceof ExecutionException) {
221                                                            t = t.getCause();
222                                                    }
223    
224                                                    defaultNoticeableFuture.setException(t);
225                                            }
226                                    }
227    
228                            });
229    
230                    return defaultNoticeableFuture;
231            }
232    
233            private static volatile ThreadPoolExecutor _threadPoolExecutor;
234    
235            private static class ProcessStdErrCallable<T> implements Callable<T> {
236    
237                    public ProcessStdErrCallable(
238                            OutputProcessor<?, T> outputProcessor, Process process) {
239    
240                            _outputProcessor = outputProcessor;
241                            _process = process;
242                    }
243    
244                    @Override
245                    public T call() throws Exception {
246                            return _outputProcessor.processStdErr(_process.getErrorStream());
247                    }
248    
249                    private final OutputProcessor<?, T> _outputProcessor;
250                    private final Process _process;
251    
252            }
253    
254            private static class ProcessStdOutCallable<T> implements Callable<T> {
255    
256                    public ProcessStdOutCallable(
257                            OutputProcessor<T, ?> outputProcessor, Process process) {
258    
259                            _outputProcessor = outputProcessor;
260                            _process = process;
261                    }
262    
263                    @Override
264                    public T call() throws Exception {
265                            try {
266                                    return _outputProcessor.processStdOut(
267                                            _process.getInputStream());
268                            }
269                            finally {
270                                    try {
271                                            int exitCode = _process.waitFor();
272    
273                                            if (exitCode != 0) {
274                                                    throw new TerminationProcessException(exitCode);
275                                            }
276                                    }
277                                    catch (InterruptedException ie) {
278                                            _process.destroy();
279    
280                                            throw new ProcessException(
281                                                    "Forcibly killed subprocess on interruption", ie);
282                                    }
283                            }
284                    }
285    
286                    private final OutputProcessor<T, ?> _outputProcessor;
287                    private final Process _process;
288    
289            }
290    
291    }