001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
019    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
020    import com.liferay.portal.kernel.concurrent.FutureListener;
021    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024    import com.liferay.portal.kernel.util.NamedThreadFactory;
025    import com.liferay.portal.kernel.util.ObjectValuePair;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    
028    import java.io.IOException;
029    
030    import java.util.Arrays;
031    import java.util.List;
032    import java.util.concurrent.Callable;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicMarkableReference;
037    
038    /**
039     * @author Shuyang Zhou
040     */
041    public class ProcessUtil {
042    
043            public static final CollectorOutputProcessor COLLECTOR_OUTPUT_PROCESSOR =
044                    new CollectorOutputProcessor();
045    
046            public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
047                    new ConsumerOutputProcessor();
048    
049            public static final EchoOutputProcessor ECHO_OUTPUT_PROCESSOR =
050                    new EchoOutputProcessor();
051    
052            public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
053                    new LoggingOutputProcessor();
054    
055            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
056                            OutputProcessor<O, E> outputProcessor, List<String> arguments)
057                    throws ProcessException {
058    
059                    if (outputProcessor == null) {
060                            throw new NullPointerException("Output processor is null");
061                    }
062    
063                    if (arguments == null) {
064                            throw new NullPointerException("Arguments is null");
065                    }
066    
067                    ProcessBuilder processBuilder = new ProcessBuilder(arguments);
068    
069                    try {
070                            Process process = processBuilder.start();
071    
072                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
073    
074                            try {
075                                    NoticeableFuture<O> stdOutNoticeableFuture =
076                                            threadPoolExecutor.submit(
077                                                    new ProcessStdOutCallable<O>(outputProcessor, process));
078    
079                                    NoticeableFuture<E> stdErrNoticeableFuture =
080                                            threadPoolExecutor.submit(
081                                                    new ProcessStdErrCallable<E>(outputProcessor, process));
082    
083                                    return _wrapNoticeableFuture(
084                                            stdOutNoticeableFuture, stdErrNoticeableFuture, process);
085                            }
086                            catch (RejectedExecutionException ree) {
087                                    process.destroy();
088    
089                                    throw new ProcessException(
090                                            "Cancelled execution because of a concurrent destroy", ree);
091                            }
092                    }
093                    catch (IOException ioe) {
094                            throw new ProcessException(ioe);
095                    }
096            }
097    
098            public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
099                            OutputProcessor<O, E> outputProcessor, String... arguments)
100                    throws ProcessException {
101    
102                    return execute(outputProcessor, Arrays.asList(arguments));
103            }
104    
105            public void destroy() {
106                    if (_threadPoolExecutor == null) {
107                            return;
108                    }
109    
110                    synchronized (ProcessUtil.class) {
111                            if (_threadPoolExecutor != null) {
112                                    _threadPoolExecutor.shutdownNow();
113    
114                                    _threadPoolExecutor = null;
115                            }
116                    }
117            }
118    
119            private static ThreadPoolExecutor _getThreadPoolExecutor() {
120                    if (_threadPoolExecutor != null) {
121                            return _threadPoolExecutor;
122                    }
123    
124                    synchronized (ProcessUtil.class) {
125                            if (_threadPoolExecutor == null) {
126                                    _threadPoolExecutor = new ThreadPoolExecutor(
127                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
128                                            Integer.MAX_VALUE, new AbortPolicy(),
129                                            new NamedThreadFactory(
130                                                    ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
131                                                    PortalClassLoaderUtil.getClassLoader()),
132                                            new ThreadPoolHandlerAdapter());
133                            }
134                    }
135    
136                    return _threadPoolExecutor;
137            }
138    
139            private static <O, E> NoticeableFuture<ObjectValuePair<O, E>>
140                    _wrapNoticeableFuture(
141                            final NoticeableFuture<O> stdOutNoticeableFuture,
142                            final NoticeableFuture<E> stdErrNoticeableFuture,
143                            final Process process) {
144    
145                    final DefaultNoticeableFuture<ObjectValuePair<O, E>>
146                            defaultNoticeableFuture = new DefaultNoticeableFuture<>();
147    
148                    defaultNoticeableFuture.addFutureListener(
149                            new FutureListener<ObjectValuePair<O, E>>() {
150    
151                                    @Override
152                                    public void complete(Future<ObjectValuePair<O, E>> future) {
153                                            if (!future.isCancelled()) {
154                                                    return;
155                                            }
156    
157                                            stdOutNoticeableFuture.cancel(true);
158    
159                                            stdErrNoticeableFuture.cancel(true);
160    
161                                            process.destroy();
162                                    }
163    
164                            });
165    
166                    final AtomicMarkableReference<O> stdOutReference =
167                            new AtomicMarkableReference<>(null, false);
168    
169                    final AtomicMarkableReference<E> stdErrReference =
170                            new AtomicMarkableReference<>(null, false);
171    
172                    stdOutNoticeableFuture.addFutureListener(
173                            new BaseFutureListener<O>() {
174    
175                                    @Override
176                                    public void completeWithCancel(Future<O> future) {
177                                            defaultNoticeableFuture.cancel(true);
178                                    }
179    
180                                    @Override
181                                    public void completeWithException(
182                                            Future<O> future, Throwable throwable) {
183    
184                                            defaultNoticeableFuture.setException(throwable);
185                                    }
186    
187                                    @Override
188                                    public void completeWithResult(Future<O> future, O stdOut) {
189                                            stdOutReference.set(stdOut, true);
190    
191                                            boolean[] markHolder = new boolean[1];
192    
193                                            E stdErr = stdErrReference.get(markHolder);
194    
195                                            if (markHolder[0]) {
196                                                    defaultNoticeableFuture.set(
197                                                            new ObjectValuePair<O, E>(stdOut, stdErr));
198                                            }
199                                    }
200    
201                            });
202    
203                    stdErrNoticeableFuture.addFutureListener(
204                            new BaseFutureListener<E>() {
205    
206                                    @Override
207                                    public void completeWithCancel(Future<E> future) {
208                                            defaultNoticeableFuture.cancel(true);
209                                    }
210    
211                                    @Override
212                                    public void completeWithException(
213                                            Future<E> future, Throwable throwable) {
214    
215                                            defaultNoticeableFuture.setException(throwable);
216                                    }
217    
218                                    @Override
219                                    public void completeWithResult(Future<E> future, E stdErr) {
220                                            stdErrReference.set(stdErr, true);
221    
222                                            boolean[] markHolder = new boolean[1];
223    
224                                            O stdOut = stdOutReference.get(markHolder);
225    
226                                            if (markHolder[0]) {
227                                                    defaultNoticeableFuture.set(
228                                                            new ObjectValuePair<O, E>(stdOut, stdErr));
229                                            }
230                                    }
231    
232                            });
233    
234                    return defaultNoticeableFuture;
235            }
236    
237            private static volatile ThreadPoolExecutor _threadPoolExecutor;
238    
239            private static class ProcessStdErrCallable<T> implements Callable<T> {
240    
241                    public ProcessStdErrCallable(
242                            OutputProcessor<?, T> outputProcessor, Process process) {
243    
244                            _outputProcessor = outputProcessor;
245                            _process = process;
246                    }
247    
248                    @Override
249                    public T call() throws Exception {
250                            return _outputProcessor.processStdErr(_process.getErrorStream());
251                    }
252    
253                    private final OutputProcessor<?, T> _outputProcessor;
254                    private final Process _process;
255    
256            }
257    
258            private static class ProcessStdOutCallable<T> implements Callable<T> {
259    
260                    public ProcessStdOutCallable(
261                            OutputProcessor<T, ?> outputProcessor, Process process) {
262    
263                            _outputProcessor = outputProcessor;
264                            _process = process;
265                    }
266    
267                    @Override
268                    public T call() throws Exception {
269                            try {
270                                    return _outputProcessor.processStdOut(
271                                            _process.getInputStream());
272                            }
273                            finally {
274                                    try {
275                                            int exitCode = _process.waitFor();
276    
277                                            if (exitCode != 0) {
278                                                    throw new TerminationProcessException(exitCode);
279                                            }
280                                    }
281                                    catch (InterruptedException ie) {
282                                            _process.destroy();
283    
284                                            throw new ProcessException(
285                                                    "Forcibly killed subprocess on interruption", ie);
286                                    }
287                            }
288                    }
289    
290                    private final OutputProcessor<T, ?> _outputProcessor;
291                    private final Process _process;
292    
293            }
294    
295    }