001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.concurrent.AbortPolicy;
018 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
019 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
020 import com.liferay.portal.kernel.concurrent.FutureListener;
021 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
022 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024 import com.liferay.portal.kernel.util.NamedThreadFactory;
025 import com.liferay.portal.kernel.util.ObjectValuePair;
026 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027
028 import java.io.IOException;
029
030 import java.util.Arrays;
031 import java.util.List;
032 import java.util.concurrent.Callable;
033 import java.util.concurrent.Future;
034 import java.util.concurrent.RejectedExecutionException;
035 import java.util.concurrent.TimeUnit;
036 import java.util.concurrent.atomic.AtomicMarkableReference;
037
038
041 public class ProcessUtil {
042
043 public static final CollectorOutputProcessor COLLECTOR_OUTPUT_PROCESSOR =
044 new CollectorOutputProcessor();
045
046 public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
047 new ConsumerOutputProcessor();
048
049 public static final EchoOutputProcessor ECHO_OUTPUT_PROCESSOR =
050 new EchoOutputProcessor();
051
052 public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
053 new LoggingOutputProcessor();
054
055 public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
056 OutputProcessor<O, E> outputProcessor, List<String> arguments)
057 throws ProcessException {
058
059 if (outputProcessor == null) {
060 throw new NullPointerException("Output processor is null");
061 }
062
063 if (arguments == null) {
064 throw new NullPointerException("Arguments is null");
065 }
066
067 ProcessBuilder processBuilder = new ProcessBuilder(arguments);
068
069 try {
070 Process process = processBuilder.start();
071
072 ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
073
074 try {
075 NoticeableFuture<O> stdOutNoticeableFuture =
076 threadPoolExecutor.submit(
077 new ProcessStdOutCallable<O>(outputProcessor, process));
078
079 NoticeableFuture<E> stdErrNoticeableFuture =
080 threadPoolExecutor.submit(
081 new ProcessStdErrCallable<E>(outputProcessor, process));
082
083 return _wrapNoticeableFuture(
084 stdOutNoticeableFuture, stdErrNoticeableFuture, process);
085 }
086 catch (RejectedExecutionException ree) {
087 process.destroy();
088
089 throw new ProcessException(
090 "Cancelled execution because of a concurrent destroy", ree);
091 }
092 }
093 catch (IOException ioe) {
094 throw new ProcessException(ioe);
095 }
096 }
097
098 public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
099 OutputProcessor<O, E> outputProcessor, String... arguments)
100 throws ProcessException {
101
102 return execute(outputProcessor, Arrays.asList(arguments));
103 }
104
105 public void destroy() {
106 if (_threadPoolExecutor == null) {
107 return;
108 }
109
110 synchronized (ProcessUtil.class) {
111 if (_threadPoolExecutor != null) {
112 _threadPoolExecutor.shutdownNow();
113
114 _threadPoolExecutor = null;
115 }
116 }
117 }
118
119 private static ThreadPoolExecutor _getThreadPoolExecutor() {
120 if (_threadPoolExecutor != null) {
121 return _threadPoolExecutor;
122 }
123
124 synchronized (ProcessUtil.class) {
125 if (_threadPoolExecutor == null) {
126 _threadPoolExecutor = new ThreadPoolExecutor(
127 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
128 Integer.MAX_VALUE, new AbortPolicy(),
129 new NamedThreadFactory(
130 ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
131 PortalClassLoaderUtil.getClassLoader()),
132 new ThreadPoolHandlerAdapter());
133 }
134 }
135
136 return _threadPoolExecutor;
137 }
138
139 private static <O, E> NoticeableFuture<ObjectValuePair<O, E>>
140 _wrapNoticeableFuture(
141 final NoticeableFuture<O> stdOutNoticeableFuture,
142 final NoticeableFuture<E> stdErrNoticeableFuture,
143 final Process process) {
144
145 final DefaultNoticeableFuture<ObjectValuePair<O, E>>
146 defaultNoticeableFuture = new DefaultNoticeableFuture<>();
147
148 defaultNoticeableFuture.addFutureListener(
149 new FutureListener<ObjectValuePair<O, E>>() {
150
151 @Override
152 public void complete(Future<ObjectValuePair<O, E>> future) {
153 if (!future.isCancelled()) {
154 return;
155 }
156
157 stdOutNoticeableFuture.cancel(true);
158
159 stdErrNoticeableFuture.cancel(true);
160
161 process.destroy();
162 }
163
164 });
165
166 final AtomicMarkableReference<O> stdOutReference =
167 new AtomicMarkableReference<>(null, false);
168
169 final AtomicMarkableReference<E> stdErrReference =
170 new AtomicMarkableReference<>(null, false);
171
172 stdOutNoticeableFuture.addFutureListener(
173 new BaseFutureListener<O>() {
174
175 @Override
176 public void completeWithCancel(Future<O> future) {
177 defaultNoticeableFuture.cancel(true);
178 }
179
180 @Override
181 public void completeWithException(
182 Future<O> future, Throwable throwable) {
183
184 defaultNoticeableFuture.setException(throwable);
185 }
186
187 @Override
188 public void completeWithResult(Future<O> future, O stdOut) {
189 stdOutReference.set(stdOut, true);
190
191 boolean[] markHolder = new boolean[1];
192
193 E stdErr = stdErrReference.get(markHolder);
194
195 if (markHolder[0]) {
196 defaultNoticeableFuture.set(
197 new ObjectValuePair<O, E>(stdOut, stdErr));
198 }
199 }
200
201 });
202
203 stdErrNoticeableFuture.addFutureListener(
204 new BaseFutureListener<E>() {
205
206 @Override
207 public void completeWithCancel(Future<E> future) {
208 defaultNoticeableFuture.cancel(true);
209 }
210
211 @Override
212 public void completeWithException(
213 Future<E> future, Throwable throwable) {
214
215 defaultNoticeableFuture.setException(throwable);
216 }
217
218 @Override
219 public void completeWithResult(Future<E> future, E stdErr) {
220 stdErrReference.set(stdErr, true);
221
222 boolean[] markHolder = new boolean[1];
223
224 O stdOut = stdOutReference.get(markHolder);
225
226 if (markHolder[0]) {
227 defaultNoticeableFuture.set(
228 new ObjectValuePair<O, E>(stdOut, stdErr));
229 }
230 }
231
232 });
233
234 return defaultNoticeableFuture;
235 }
236
237 private static volatile ThreadPoolExecutor _threadPoolExecutor;
238
239 private static class ProcessStdErrCallable<T> implements Callable<T> {
240
241 public ProcessStdErrCallable(
242 OutputProcessor<?, T> outputProcessor, Process process) {
243
244 _outputProcessor = outputProcessor;
245 _process = process;
246 }
247
248 @Override
249 public T call() throws Exception {
250 return _outputProcessor.processStdErr(_process.getErrorStream());
251 }
252
253 private final OutputProcessor<?, T> _outputProcessor;
254 private final Process _process;
255
256 }
257
258 private static class ProcessStdOutCallable<T> implements Callable<T> {
259
260 public ProcessStdOutCallable(
261 OutputProcessor<T, ?> outputProcessor, Process process) {
262
263 _outputProcessor = outputProcessor;
264 _process = process;
265 }
266
267 @Override
268 public T call() throws Exception {
269 try {
270 return _outputProcessor.processStdOut(
271 _process.getInputStream());
272 }
273 finally {
274 try {
275 int exitCode = _process.waitFor();
276
277 if (exitCode != 0) {
278 throw new TerminationProcessException(exitCode);
279 }
280 }
281 catch (InterruptedException ie) {
282 _process.destroy();
283
284 throw new ProcessException(
285 "Forcibly killed subprocess on interruption", ie);
286 }
287 }
288 }
289
290 private final OutputProcessor<T, ?> _outputProcessor;
291 private final Process _process;
292
293 }
294
295 }