001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.concurrent.AbortPolicy;
018 import com.liferay.portal.kernel.concurrent.AsyncBroker;
019 import com.liferay.portal.kernel.concurrent.FutureListener;
020 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.process.ProcessCallable;
029 import com.liferay.portal.kernel.process.ProcessChannel;
030 import com.liferay.portal.kernel.process.ProcessConfig;
031 import com.liferay.portal.kernel.process.ProcessException;
032 import com.liferay.portal.kernel.process.ProcessExecutor;
033 import com.liferay.portal.kernel.process.TerminationProcessException;
034 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035 import com.liferay.portal.kernel.util.NamedThreadFactory;
036 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037 import com.liferay.portal.kernel.util.StreamUtil;
038
039 import java.io.EOFException;
040 import java.io.File;
041 import java.io.FileOutputStream;
042 import java.io.IOException;
043 import java.io.ObjectInputStream;
044 import java.io.ObjectOutputStream;
045 import java.io.Serializable;
046 import java.io.StreamCorruptedException;
047
048 import java.util.ArrayList;
049 import java.util.Collection;
050 import java.util.Iterator;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.concurrent.Callable;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.Future;
056 import java.util.concurrent.RejectedExecutionException;
057 import java.util.concurrent.TimeUnit;
058
059
062 public class LocalProcessExecutor implements ProcessExecutor {
063
064 public void destroy() {
065 if (_threadPoolExecutor == null) {
066 return;
067 }
068
069 synchronized (this) {
070 if (_threadPoolExecutor != null) {
071 _threadPoolExecutor.shutdownNow();
072
073
074
075
076
077
078
079
080 Collection<NoticeableFuture<?>> values =
081 _managedProcesses.values();
082
083 Iterator<NoticeableFuture<?>> iterator = values.iterator();
084
085 while (iterator.hasNext()) {
086 NoticeableFuture<?> noticeableFuture = iterator.next();
087
088 noticeableFuture.cancel(true);
089
090 iterator.remove();
091 }
092
093
094
095
096
097
098 _managedProcesses.clear();
099
100 _threadPoolExecutor = null;
101 }
102 }
103 }
104
105 @Override
106 public <T extends Serializable> ProcessChannel<T> execute(
107 ProcessConfig processConfig, ProcessCallable<T> processCallable)
108 throws ProcessException {
109
110 try {
111 List<String> arguments = processConfig.getArguments();
112
113 List<String> commands = new ArrayList<String>(arguments.size() + 4);
114
115 commands.add(processConfig.getJavaExecutable());
116 commands.add("-cp");
117 commands.add(processConfig.getBootstrapClassPath());
118 commands.addAll(arguments);
119 commands.add(LocalProcessLauncher.class.getName());
120
121 ProcessBuilder processBuilder = new ProcessBuilder(commands);
122
123 final Process process = processBuilder.start();
124
125 ObjectOutputStream bootstrapObjectOutputStream =
126 new ObjectOutputStream(process.getOutputStream());
127
128 bootstrapObjectOutputStream.writeObject(processCallable.toString());
129 bootstrapObjectOutputStream.writeObject(
130 processConfig.getRuntimeClassPath());
131
132 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
133 bootstrapObjectOutputStream);
134
135 objectOutputStream.writeObject(processCallable);
136
137 objectOutputStream.flush();
138
139 ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
140
141 AsyncBroker<Long, Serializable> asyncBroker =
142 new AsyncBroker<Long, Serializable>();
143
144 SubprocessReactor subprocessReactor = new SubprocessReactor(
145 process, processConfig.getReactClassLoader(), asyncBroker);
146
147 try {
148 NoticeableFuture<ProcessCallable<? extends Serializable>>
149 processCallableNoticeableFuture = threadPoolExecutor.submit(
150 subprocessReactor);
151
152 processCallableNoticeableFuture.addFutureListener(
153 new FutureListener
154 <ProcessCallable<? extends Serializable>>() {
155
156 @Override
157 public void complete(
158 Future<ProcessCallable<? extends Serializable>>
159 future) {
160
161 if (future.isCancelled()) {
162 process.destroy();
163 }
164 }
165
166 });
167
168
169
170
171 _managedProcesses.put(process, processCallableNoticeableFuture);
172
173 NoticeableFuture<T> noticeableFuture =
174 new NoticeableFutureConverter
175 <T, ProcessCallable<? extends Serializable>>(
176 processCallableNoticeableFuture) {
177
178 @Override
179 protected T convert(
180 ProcessCallable<? extends Serializable>
181 processCallable)
182 throws ProcessException {
183
184 if (processCallable instanceof
185 ReturnProcessCallable<?>) {
186
187 return (T)processCallable.call();
188 }
189
190 ExceptionProcessCallable
191 exceptionProcessCallable =
192 (ExceptionProcessCallable)
193 processCallable;
194
195 throw exceptionProcessCallable.call();
196 }
197
198 };
199
200 return new LocalProcessChannel<T>(
201 noticeableFuture, objectOutputStream, asyncBroker);
202 }
203 catch (RejectedExecutionException ree) {
204 process.destroy();
205
206 throw new ProcessException(
207 "Cancelled execution because of a concurrent destroy", ree);
208 }
209 }
210 catch (IOException ioe) {
211 throw new ProcessException(ioe);
212 }
213 }
214
215 private ThreadPoolExecutor _getThreadPoolExecutor() {
216 if (_threadPoolExecutor != null) {
217 return _threadPoolExecutor;
218 }
219
220 synchronized (this) {
221 if (_threadPoolExecutor == null) {
222 _threadPoolExecutor = new ThreadPoolExecutor(
223 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
224 Integer.MAX_VALUE, new AbortPolicy(),
225 new NamedThreadFactory(
226 LocalProcessExecutor.class.getName(),
227 Thread.MIN_PRIORITY,
228 PortalClassLoaderUtil.getClassLoader()),
229 new ThreadPoolHandlerAdapter());
230 }
231 }
232
233 return _threadPoolExecutor;
234 }
235
236 private static final Log _log = LogFactoryUtil.getLog(
237 LocalProcessExecutor.class);
238
239 private final Map<Process, NoticeableFuture<?>> _managedProcesses =
240 new ConcurrentHashMap<Process, NoticeableFuture<?>>();
241 private volatile ThreadPoolExecutor _threadPoolExecutor;
242
243 private class SubprocessReactor
244 implements Callable<ProcessCallable<? extends Serializable>> {
245
246 public SubprocessReactor(
247 Process process, ClassLoader reactClassLoader,
248 AsyncBroker<Long, Serializable> asyncBroker) {
249
250 _process = process;
251 _reactClassLoader = reactClassLoader;
252 _asyncBroker = asyncBroker;
253 }
254
255 @Override
256 public ProcessCallable<? extends Serializable> call() throws Exception {
257 ProcessCallable<?> resultProcessCallable = null;
258
259 AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
260
261 UnsyncBufferedInputStream unsyncBufferedInputStream =
262 new UnsyncBufferedInputStream(_process.getInputStream());
263
264 try {
265 ObjectInputStream objectInputStream = null;
266
267 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
268 new UnsyncByteArrayOutputStream();
269
270 while (true) {
271 try {
272
273
274
275 unsyncBufferedInputStream.mark(4);
276
277 objectInputStream = new ClassLoaderObjectInputStream(
278 unsyncBufferedInputStream, _reactClassLoader);
279
280
281
282
283 if (unsyncByteArrayOutputStream.size() > 0) {
284 if (_log.isWarnEnabled()) {
285 _log.warn(
286 "Found corrupt leading log " +
287 unsyncByteArrayOutputStream.toString());
288 }
289 }
290
291 unsyncByteArrayOutputStream = null;
292
293 break;
294 }
295 catch (StreamCorruptedException sce) {
296
297
298
299 unsyncBufferedInputStream.reset();
300
301 unsyncByteArrayOutputStream.write(
302 unsyncBufferedInputStream.read());
303 }
304 }
305
306 while (true) {
307 ProcessCallable<?> processCallable =
308 (ProcessCallable<?>)objectInputStream.readObject();
309
310 if ((processCallable instanceof ExceptionProcessCallable) ||
311 (processCallable instanceof ReturnProcessCallable<?>)) {
312
313 resultProcessCallable = processCallable;
314
315 continue;
316 }
317
318 try {
319 Serializable returnValue = processCallable.call();
320
321 if (_log.isDebugEnabled()) {
322 _log.debug(
323 "Invoked generic process callable " +
324 processCallable + " with return value " +
325 returnValue);
326 }
327 }
328 catch (Throwable t) {
329 _log.error(
330 "Unable to invoke generic process callable", t);
331 }
332 }
333 }
334 catch (StreamCorruptedException sce) {
335 File file = File.createTempFile(
336 "corrupted-stream-dump-" + System.currentTimeMillis(),
337 ".log");
338
339 _log.error(
340 "Dumping content of corrupted object input stream to " +
341 file.getAbsolutePath(),
342 sce);
343
344 FileOutputStream fileOutputStream = new FileOutputStream(file);
345
346 StreamUtil.transfer(
347 unsyncBufferedInputStream, fileOutputStream);
348
349 throw new ProcessException(
350 "Corrupted object input stream", sce);
351 }
352 catch (EOFException eofe) {
353 throw new ProcessException(
354 "Subprocess piping back ended prematurely", eofe);
355 }
356 catch (Throwable t) {
357 _log.error("Abort subprocess piping", t);
358
359 throw t;
360 }
361 finally {
362 try {
363 int exitCode = _process.waitFor();
364
365 if (exitCode != 0) {
366 throw new TerminationProcessException(exitCode);
367 }
368 }
369 catch (InterruptedException ie) {
370 _process.destroy();
371
372 throw new ProcessException(
373 "Forcibly killed subprocess on interruption", ie);
374 }
375
376 _managedProcesses.remove(_process);
377
378 if (resultProcessCallable != null) {
379
380
381
382 return resultProcessCallable;
383 }
384
385 AsyncBrokerThreadLocal.removeAsyncBroker();
386 }
387 }
388
389 private final AsyncBroker<Long, Serializable> _asyncBroker;
390 private final Process _process;
391 private final ClassLoader _reactClassLoader;
392
393 }
394
395 }