001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.util.NamedThreadFactory;
018 import com.liferay.portal.kernel.util.ObjectValuePair;
019 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
020
021 import java.io.IOException;
022
023 import java.util.Arrays;
024 import java.util.List;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ExecutionException;
027 import java.util.concurrent.ExecutorService;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.RejectedExecutionException;
031 import java.util.concurrent.TimeUnit;
032 import java.util.concurrent.TimeoutException;
033
034
037 public class ProcessUtil {
038
039 public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
040 new ConsumerOutputProcessor();
041
042 public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
043 new LoggingOutputProcessor();
044
045 public static <O, E> Future<ObjectValuePair<O, E>> execute(
046 OutputProcessor<O, E> outputProcessor, List<String> arguments)
047 throws ProcessException {
048
049 if (outputProcessor == null) {
050 throw new NullPointerException("Output processor is null");
051 }
052
053 if (arguments == null) {
054 throw new NullPointerException("Arguments is null");
055 }
056
057 ProcessBuilder processBuilder = new ProcessBuilder(arguments);
058
059 try {
060 Process process = processBuilder.start();
061
062 ExecutorService executorService = _getExecutorService();
063
064 try {
065 Future<O> stdOutFuture = executorService.submit(
066 new ProcessStdOutCallable<O>(outputProcessor, process));
067
068 Future<E> stdErrFuture = executorService.submit(
069 new ProcessStdErrCallable<E>(outputProcessor, process));
070
071 return new BindedFuture<O, E>(
072 stdOutFuture, stdErrFuture, process);
073 }
074 catch (RejectedExecutionException ree) {
075 process.destroy();
076
077 throw new ProcessException(
078 "Cancelled execution because of a concurrent destroy", ree);
079 }
080 }
081 catch (IOException e) {
082 throw new ProcessException(e);
083 }
084 }
085
086 public static <O, E> Future<ObjectValuePair<O, E>> execute(
087 OutputProcessor<O, E> outputProcessor, String... arguments)
088 throws ProcessException {
089
090 return execute(outputProcessor, Arrays.asList(arguments));
091 }
092
093 public void destroy() {
094 if (_executorService == null) {
095 return;
096 }
097
098 synchronized (ProcessUtil.class) {
099 if (_executorService != null) {
100 _executorService.shutdownNow();
101
102 _executorService = null;
103 }
104 }
105 }
106
107 private static ExecutorService _getExecutorService() {
108 if (_executorService != null) {
109 return _executorService;
110 }
111
112 synchronized (ProcessUtil.class) {
113 if (_executorService == null) {
114 _executorService = Executors.newCachedThreadPool(
115 new NamedThreadFactory(
116 ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
117 PortalClassLoaderUtil.getClassLoader()));
118 }
119 }
120
121 return _executorService;
122 }
123
124 private static volatile ExecutorService _executorService;
125
126 private static class BindedFuture<O, E>
127 implements Future<ObjectValuePair<O, E>> {
128
129 public BindedFuture(
130 Future<O> stdOutFuture, Future<E> stdErrFuture, Process process) {
131
132 _stdOutFuture = stdOutFuture;
133 _stdErrFuture = stdErrFuture;
134 _process = process;
135 }
136
137 public boolean cancel(boolean mayInterruptIfRunning) {
138 if (_stdOutFuture.isCancelled() || _stdOutFuture.isDone()) {
139 return false;
140 }
141
142 _stdErrFuture.cancel(true);
143 _stdOutFuture.cancel(true);
144 _process.destroy();
145
146 return true;
147 }
148
149 public ObjectValuePair<O, E> get()
150 throws ExecutionException, InterruptedException {
151
152 E stdErrResult = _stdErrFuture.get();
153 O stdOutResult = _stdOutFuture.get();
154
155 return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
156 }
157
158 public ObjectValuePair<O, E> get(long timeout, TimeUnit unit)
159 throws ExecutionException, InterruptedException, TimeoutException {
160
161 long startTime = System.currentTimeMillis();
162
163 E stdErrResult = _stdErrFuture.get(timeout, unit);
164
165 long elapseTime = System.currentTimeMillis() - startTime;
166
167 long secondTimeout =
168 timeout - unit.convert(elapseTime, TimeUnit.MILLISECONDS);
169
170 O stdOutResult = _stdOutFuture.get(secondTimeout, unit);
171
172 return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
173 }
174
175 public boolean isCancelled() {
176 return _stdOutFuture.isCancelled();
177 }
178
179 public boolean isDone() {
180 return _stdOutFuture.isDone();
181 }
182
183 private final Future<E> _stdErrFuture;
184 private final Future<O> _stdOutFuture;
185 private final Process _process;
186
187 }
188
189 private static class ProcessStdErrCallable<T> implements Callable<T> {
190
191 public ProcessStdErrCallable(
192 OutputProcessor<?, T> outputProcessor, Process process) {
193
194 _outputProcessor = outputProcessor;
195 _process = process;
196 }
197
198 public T call() throws Exception {
199 return _outputProcessor.processStdErr(_process.getErrorStream());
200 }
201
202 private final OutputProcessor<?, T> _outputProcessor;
203 private final Process _process;
204
205 }
206
207 private static class ProcessStdOutCallable<T> implements Callable<T> {
208
209 public ProcessStdOutCallable(
210 OutputProcessor<T, ?> outputProcessor, Process process) {
211
212 _outputProcessor = outputProcessor;
213 _process = process;
214 }
215
216 public T call() throws Exception {
217 try {
218 return _outputProcessor.processStdOut(
219 _process.getInputStream());
220 }
221 finally {
222 try {
223 int exitCode = _process.waitFor();
224
225 if (exitCode != 0) {
226 throw new ProcessException(
227 "Subprocess terminated with exit code " + exitCode);
228 }
229 }
230 catch (InterruptedException ie) {
231 _process.destroy();
232
233 throw new ProcessException(
234 "Forcibly killed subprocess on interruption", ie);
235 }
236 }
237 }
238
239 private final OutputProcessor<T, ?> _outputProcessor;
240 private final Process _process;
241
242 }
243
244 }