001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
019    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
020    import com.liferay.portal.kernel.concurrent.FutureListener;
021    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024    import com.liferay.portal.kernel.util.NamedThreadFactory;
025    import com.liferay.portal.kernel.util.ObjectValuePair;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    
028    import java.io.IOException;
029    
030    import java.util.Arrays;
031    import java.util.List;
032    import java.util.concurrent.Callable;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicMarkableReference;
037    
038    /**
039     * @author Shuyang Zhou
040     */
041    public class ProcessUtil {
042    
043            public static final CollectorOutputProcessor COLLECTOR_OUTPUT_PROCESSOR =
044                    new CollectorOutputProcessor();
045    
046            public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
047                    new ConsumerOutputProcessor();
048    
049            public static final EchoOutputProcessor ECHO_OUTPUT_PROCESSOR =
050                    new EchoOutputProcessor();
051    
052            public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
053                    new LoggingOutputProcessor();
054    
055            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
056                            OutputProcessor<O, E> outputProcessor, List<String> arguments)
057                    throws ProcessException {
058    
059                    if (outputProcessor == null) {
060                            throw new NullPointerException("Output processor is null");
061                    }
062    
063                    if (arguments == null) {
064                            throw new NullPointerException("Arguments is null");
065                    }
066    
067                    ProcessBuilder processBuilder = new ProcessBuilder(arguments);
068    
069                    try {
070                            Process process = processBuilder.start();
071    
072                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
073    
074                            try {
075                                    NoticeableFuture<O> stdOutNoticeableFuture =
076                                            threadPoolExecutor.submit(
077                                                    new ProcessStdOutCallable<O>(outputProcessor, process));
078    
079                                    NoticeableFuture<E> stdErrNoticeableFuture =
080                                            threadPoolExecutor.submit(
081                                                    new ProcessStdErrCallable<E>(outputProcessor, process));
082    
083                                    return _wrapNoticeableFuture(
084                                            stdOutNoticeableFuture, stdErrNoticeableFuture, process);
085                            }
086                            catch (RejectedExecutionException ree) {
087                                    process.destroy();
088    
089                                    throw new ProcessException(
090                                            "Cancelled execution because of a concurrent destroy", ree);
091                            }
092                    }
093                    catch (IOException ioe) {
094                            throw new ProcessException(ioe);
095                    }
096            }
097    
098            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
099                            OutputProcessor<O, E> outputProcessor, String... arguments)
100                    throws ProcessException {
101    
102                    return execute(outputProcessor, Arrays.asList(arguments));
103            }
104    
105            public void destroy() {
106                    if (_threadPoolExecutor == null) {
107                            return;
108                    }
109    
110                    synchronized (ProcessUtil.class) {
111                            if (_threadPoolExecutor != null) {
112                                    _threadPoolExecutor.shutdownNow();
113    
114                                    _threadPoolExecutor = null;
115                            }
116                    }
117            }
118    
119            private static ThreadPoolExecutor _getThreadPoolExecutor() {
120                    if (_threadPoolExecutor != null) {
121                            return _threadPoolExecutor;
122                    }
123    
124                    synchronized (ProcessUtil.class) {
125                            if (_threadPoolExecutor == null) {
126                                    _threadPoolExecutor = new ThreadPoolExecutor(
127                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
128                                            Integer.MAX_VALUE, new AbortPolicy(),
129                                            new NamedThreadFactory(
130                                                    ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
131                                                    PortalClassLoaderUtil.getClassLoader()),
132                                            new ThreadPoolHandlerAdapter());
133                            }
134                    }
135    
136                    return _threadPoolExecutor;
137            }
138    
139            private static <O, E> NoticeableFuture<ObjectValuePair<O, E>>
140                    _wrapNoticeableFuture(
141                            final NoticeableFuture<O> stdOutNoticeableFuture,
142                            final NoticeableFuture<E> stdErrNoticeableFuture,
143                            final Process process) {
144    
145                    final DefaultNoticeableFuture<ObjectValuePair<O, E>>
146                            defaultNoticeableFuture =
147                                    new DefaultNoticeableFuture<ObjectValuePair<O, E>>();
148    
149                    defaultNoticeableFuture.addFutureListener(
150                            new FutureListener<ObjectValuePair<O, E>>() {
151    
152                                    @Override
153                                    public void complete(Future<ObjectValuePair<O, E>> future) {
154                                            if (!future.isCancelled()) {
155                                                    return;
156                                            }
157    
158                                            stdOutNoticeableFuture.cancel(true);
159    
160                                            stdErrNoticeableFuture.cancel(true);
161    
162                                            process.destroy();
163                                    }
164    
165                            });
166    
167                    final AtomicMarkableReference<O> stdOutReference =
168                            new AtomicMarkableReference<O>(null, false);
169    
170                    final AtomicMarkableReference<E> stdErrReference =
171                            new AtomicMarkableReference<E>(null, false);
172    
173                    stdOutNoticeableFuture.addFutureListener(
174                            new BaseFutureListener<O>() {
175    
176                                    @Override
177                                    public void completeWithCancel(Future<O> future) {
178                                            defaultNoticeableFuture.cancel(true);
179                                    }
180    
181                                    @Override
182                                    public void completeWithException(
183                                            Future<O> future, Throwable throwable) {
184    
185                                            defaultNoticeableFuture.setException(throwable);
186                                    }
187    
188                                    @Override
189                                    public void completeWithResult(Future<O> future, O stdOut) {
190                                            stdOutReference.set(stdOut, true);
191    
192                                            boolean[] markHolder = new boolean[1];
193    
194                                            E stdErr = stdErrReference.get(markHolder);
195    
196                                            if (markHolder[0]) {
197                                                    defaultNoticeableFuture.set(
198                                                            new ObjectValuePair<O, E>(stdOut, stdErr));
199                                            }
200                                    }
201    
202                            });
203    
204                    stdErrNoticeableFuture.addFutureListener(
205                            new BaseFutureListener<E>() {
206    
207                                    @Override
208                                    public void completeWithCancel(Future<E> future) {
209                                            defaultNoticeableFuture.cancel(true);
210                                    }
211    
212                                    @Override
213                                    public void completeWithException(
214                                            Future<E> future, Throwable throwable) {
215    
216                                            defaultNoticeableFuture.setException(throwable);
217                                    }
218    
219                                    @Override
220                                    public void completeWithResult(Future<E> future, E stdErr) {
221                                            stdErrReference.set(stdErr, true);
222    
223                                            boolean[] markHolder = new boolean[1];
224    
225                                            O stdOut = stdOutReference.get(markHolder);
226    
227                                            if (markHolder[0]) {
228                                                    defaultNoticeableFuture.set(
229                                                            new ObjectValuePair<O, E>(stdOut, stdErr));
230                                            }
231                                    }
232    
233                            });
234    
235                    return defaultNoticeableFuture;
236            }
237    
238            private static volatile ThreadPoolExecutor _threadPoolExecutor;
239    
240            private static class ProcessStdErrCallable<T> implements Callable<T> {
241    
242                    public ProcessStdErrCallable(
243                            OutputProcessor<?, T> outputProcessor, Process process) {
244    
245                            _outputProcessor = outputProcessor;
246                            _process = process;
247                    }
248    
249                    @Override
250                    public T call() throws Exception {
251                            return _outputProcessor.processStdErr(_process.getErrorStream());
252                    }
253    
254                    private final OutputProcessor<?, T> _outputProcessor;
255                    private final Process _process;
256    
257            }
258    
259            private static class ProcessStdOutCallable<T> implements Callable<T> {
260    
261                    public ProcessStdOutCallable(
262                            OutputProcessor<T, ?> outputProcessor, Process process) {
263    
264                            _outputProcessor = outputProcessor;
265                            _process = process;
266                    }
267    
268                    @Override
269                    public T call() throws Exception {
270                            try {
271                                    return _outputProcessor.processStdOut(
272                                            _process.getInputStream());
273                            }
274                            finally {
275                                    try {
276                                            int exitCode = _process.waitFor();
277    
278                                            if (exitCode != 0) {
279                                                    throw new TerminationProcessException(exitCode);
280                                            }
281                                    }
282                                    catch (InterruptedException ie) {
283                                            _process.destroy();
284    
285                                            throw new ProcessException(
286                                                    "Forcibly killed subprocess on interruption", ie);
287                                    }
288                            }
289                    }
290    
291                    private final OutputProcessor<T, ?> _outputProcessor;
292                    private final Process _process;
293    
294            }
295    
296    }