001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.concurrent.AbortPolicy;
018 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
019 import com.liferay.portal.kernel.concurrent.FutureListener;
020 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
022 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024 import com.liferay.portal.kernel.util.ObjectValuePair;
025 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
026
027 import java.io.IOException;
028
029 import java.util.Arrays;
030 import java.util.List;
031 import java.util.concurrent.Callable;
032 import java.util.concurrent.ExecutionException;
033 import java.util.concurrent.Future;
034 import java.util.concurrent.RejectedExecutionException;
035 import java.util.concurrent.TimeUnit;
036 import java.util.concurrent.atomic.AtomicMarkableReference;
037
038
041 public class ProcessUtil {
042
043 public static final CollectorOutputProcessor COLLECTOR_OUTPUT_PROCESSOR =
044 new CollectorOutputProcessor();
045
046 public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
047 new ConsumerOutputProcessor();
048
049 public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
050 new LoggingOutputProcessor();
051
052 public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
053 OutputProcessor<O, E> outputProcessor, List<String> arguments)
054 throws ProcessException {
055
056 if (outputProcessor == null) {
057 throw new NullPointerException("Output processor is null");
058 }
059
060 if (arguments == null) {
061 throw new NullPointerException("Arguments is null");
062 }
063
064 ProcessBuilder processBuilder = new ProcessBuilder(arguments);
065
066 try {
067 Process process = processBuilder.start();
068
069 ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
070
071 try {
072 NoticeableFuture<O> stdOutNoticeableFuture =
073 threadPoolExecutor.submit(
074 new ProcessStdOutCallable<O>(outputProcessor, process));
075
076 NoticeableFuture<E> stdErrNoticeableFuture =
077 threadPoolExecutor.submit(
078 new ProcessStdErrCallable<E>(outputProcessor, process));
079
080 return _wrapNoticeableFuture(
081 stdOutNoticeableFuture, stdErrNoticeableFuture, process);
082 }
083 catch (RejectedExecutionException ree) {
084 process.destroy();
085
086 throw new ProcessException(
087 "Cancelled execution because of a concurrent destroy", ree);
088 }
089 }
090 catch (IOException ioe) {
091 throw new ProcessException(ioe);
092 }
093 }
094
095 public static <O, E> NoticeableFuture<ObjectValuePair<O, E>> execute(
096 OutputProcessor<O, E> outputProcessor, String... arguments)
097 throws ProcessException {
098
099 return execute(outputProcessor, Arrays.asList(arguments));
100 }
101
102 public void destroy() {
103 if (_threadPoolExecutor == null) {
104 return;
105 }
106
107 synchronized (ProcessUtil.class) {
108 if (_threadPoolExecutor != null) {
109 _threadPoolExecutor.shutdownNow();
110
111 _threadPoolExecutor = null;
112 }
113 }
114 }
115
116 private static ThreadPoolExecutor _getThreadPoolExecutor() {
117 if (_threadPoolExecutor != null) {
118 return _threadPoolExecutor;
119 }
120
121 synchronized (ProcessUtil.class) {
122 if (_threadPoolExecutor == null) {
123 _threadPoolExecutor = new ThreadPoolExecutor(
124 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
125 Integer.MAX_VALUE, new AbortPolicy(),
126 new NamedThreadFactory(
127 ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
128 PortalClassLoaderUtil.getClassLoader()),
129 new ThreadPoolHandlerAdapter());
130 }
131 }
132
133 return _threadPoolExecutor;
134 }
135
136 private static <O, E> NoticeableFuture<ObjectValuePair<O, E>>
137 _wrapNoticeableFuture(
138 final NoticeableFuture<O> stdOutNoticeableFuture,
139 final NoticeableFuture<E> stdErrNoticeableFuture,
140 final Process process) {
141
142 final DefaultNoticeableFuture<ObjectValuePair<O, E>>
143 defaultNoticeableFuture =
144 new DefaultNoticeableFuture<ObjectValuePair<O, E>>();
145
146 defaultNoticeableFuture.addFutureListener(
147 new FutureListener<ObjectValuePair<O, E>>() {
148
149 @Override
150 public void complete(Future<ObjectValuePair<O, E>> future) {
151 if (!future.isCancelled()) {
152 return;
153 }
154
155 stdOutNoticeableFuture.cancel(true);
156
157 stdErrNoticeableFuture.cancel(true);
158
159 process.destroy();
160 }
161
162 });
163
164 final AtomicMarkableReference<O> stdOutReference =
165 new AtomicMarkableReference<O>(null, false);
166
167 final AtomicMarkableReference<E> stdErrReference =
168 new AtomicMarkableReference<E>(null, false);
169
170 stdOutNoticeableFuture.addFutureListener(
171 new FutureListener<O>() {
172
173 @Override
174 public void complete(Future<O> future) {
175 try {
176 O stdOut = future.get();
177
178 stdOutReference.set(stdOut, true);
179
180 boolean[] markHolder = new boolean[1];
181
182 E stdErr = stdErrReference.get(markHolder);
183
184 if (markHolder[0]) {
185 defaultNoticeableFuture.set(
186 new ObjectValuePair<O, E>(stdOut, stdErr));
187 }
188 }
189 catch (Throwable t) {
190 if (t instanceof ExecutionException) {
191 t = t.getCause();
192 }
193
194 defaultNoticeableFuture.setException(t);
195 }
196 }
197
198 });
199
200 stdErrNoticeableFuture.addFutureListener(
201 new FutureListener<E>() {
202
203 @Override
204 public void complete(Future<E> future) {
205 try {
206 E stdErr = future.get();
207
208 stdErrReference.set(stdErr, true);
209
210 boolean[] markHolder = new boolean[1];
211
212 O stdOut = stdOutReference.get(markHolder);
213
214 if (markHolder[0]) {
215 defaultNoticeableFuture.set(
216 new ObjectValuePair<O, E>(stdOut, stdErr));
217 }
218 }
219 catch (Throwable t) {
220 if (t instanceof ExecutionException) {
221 t = t.getCause();
222 }
223
224 defaultNoticeableFuture.setException(t);
225 }
226 }
227
228 });
229
230 return defaultNoticeableFuture;
231 }
232
233 private static volatile ThreadPoolExecutor _threadPoolExecutor;
234
235 private static class ProcessStdErrCallable<T> implements Callable<T> {
236
237 public ProcessStdErrCallable(
238 OutputProcessor<?, T> outputProcessor, Process process) {
239
240 _outputProcessor = outputProcessor;
241 _process = process;
242 }
243
244 @Override
245 public T call() throws Exception {
246 return _outputProcessor.processStdErr(_process.getErrorStream());
247 }
248
249 private final OutputProcessor<?, T> _outputProcessor;
250 private final Process _process;
251
252 }
253
254 private static class ProcessStdOutCallable<T> implements Callable<T> {
255
256 public ProcessStdOutCallable(
257 OutputProcessor<T, ?> outputProcessor, Process process) {
258
259 _outputProcessor = outputProcessor;
260 _process = process;
261 }
262
263 @Override
264 public T call() throws Exception {
265 try {
266 return _outputProcessor.processStdOut(
267 _process.getInputStream());
268 }
269 finally {
270 try {
271 int exitCode = _process.waitFor();
272
273 if (exitCode != 0) {
274 throw new TerminationProcessException(exitCode);
275 }
276 }
277 catch (InterruptedException ie) {
278 _process.destroy();
279
280 throw new ProcessException(
281 "Forcibly killed subprocess on interruption", ie);
282 }
283 }
284 }
285
286 private final OutputProcessor<T, ?> _outputProcessor;
287 private final Process _process;
288
289 }
290
291 }