001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
019 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
020 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024 import com.liferay.portal.kernel.util.NamedThreadFactory;
025 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
026 import com.liferay.portal.kernel.util.StreamUtil;
027 import com.liferay.portal.kernel.util.StringPool;
028
029 import java.io.EOFException;
030 import java.io.File;
031 import java.io.FileDescriptor;
032 import java.io.FileOutputStream;
033 import java.io.IOException;
034 import java.io.ObjectInputStream;
035 import java.io.ObjectOutputStream;
036 import java.io.PrintStream;
037 import java.io.Serializable;
038 import java.io.StreamCorruptedException;
039
040 import java.util.ArrayList;
041 import java.util.Collections;
042 import java.util.Iterator;
043 import java.util.List;
044 import java.util.Set;
045 import java.util.concurrent.Callable;
046 import java.util.concurrent.ConcurrentHashMap;
047 import java.util.concurrent.ConcurrentMap;
048 import java.util.concurrent.ExecutionException;
049 import java.util.concurrent.ExecutorService;
050 import java.util.concurrent.Executors;
051 import java.util.concurrent.Future;
052 import java.util.concurrent.RejectedExecutionException;
053 import java.util.concurrent.TimeUnit;
054 import java.util.concurrent.TimeoutException;
055 import java.util.concurrent.atomic.AtomicReference;
056
057
060 public class ProcessExecutor {
061
062 public static <T extends Serializable> Future<T> execute(
063 String classPath, List<String> arguments,
064 ProcessCallable<? extends Serializable> processCallable)
065 throws ProcessException {
066
067 return execute("java", classPath, arguments, processCallable);
068 }
069
070 public static <T extends Serializable> Future<T> execute(
071 String classPath,
072 ProcessCallable<? extends Serializable> processCallable)
073 throws ProcessException {
074
075 return execute(
076 "java", classPath, Collections.<String>emptyList(),
077 processCallable);
078 }
079
080 public static <T extends Serializable> Future<T> execute(
081 String java, String classPath, List<String> arguments,
082 ProcessCallable<? extends Serializable> processCallable)
083 throws ProcessException {
084
085 try {
086 List<String> commands = new ArrayList<String>(arguments.size() + 4);
087
088 commands.add(java);
089 commands.add("-cp");
090 commands.add(classPath);
091 commands.addAll(arguments);
092 commands.add(ProcessExecutor.class.getName());
093
094 ProcessBuilder processBuilder = new ProcessBuilder(commands);
095
096 Process process = processBuilder.start();
097
098 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
099 process.getOutputStream());
100
101 try {
102 objectOutputStream.writeObject(processCallable);
103 }
104 finally {
105 objectOutputStream.close();
106 }
107
108 ExecutorService executorService = _getExecutorService();
109
110 SubprocessReactor subprocessReactor = new SubprocessReactor(
111 process);
112
113 try {
114 Future<ProcessCallable<? extends Serializable>>
115 futureResponseProcessCallable = executorService.submit(
116 subprocessReactor);
117
118
119
120
121 _managedProcesses.add(process);
122
123 return new ProcessExecutionFutureResult<T>(
124 futureResponseProcessCallable, process);
125 }
126 catch (RejectedExecutionException ree) {
127 process.destroy();
128
129 throw new ProcessException(
130 "Cancelled execution because of a concurrent destroy", ree);
131 }
132 }
133 catch (IOException ioe) {
134 throw new ProcessException(ioe);
135 }
136 }
137
138 public static void main(String[] arguments)
139 throws ClassNotFoundException, IOException {
140
141 PrintStream oldOutPrintStream = System.out;
142
143 ObjectOutputStream objectOutputStream = null;
144 ProcessOutputStream outProcessOutputStream = null;
145
146 synchronized (oldOutPrintStream) {
147 oldOutPrintStream.flush();
148
149 FileOutputStream fileOutputStream = new FileOutputStream(
150 FileDescriptor.out);
151
152 objectOutputStream = new ObjectOutputStream(
153 new UnsyncBufferedOutputStream(fileOutputStream));
154
155 outProcessOutputStream = new ProcessOutputStream(
156 objectOutputStream, false);
157
158 ProcessContext._setProcessOutputStream(outProcessOutputStream);
159
160 PrintStream newOutPrintStream = new PrintStream(
161 outProcessOutputStream, true);
162
163 System.setOut(newOutPrintStream);
164 }
165
166 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
167 objectOutputStream, true);
168
169 PrintStream errPrintStream = new PrintStream(
170 errProcessOutputStream, true);
171
172 System.setErr(errPrintStream);
173
174 try {
175 ObjectInputStream objectInputStream = new ObjectInputStream(
176 System.in);
177
178 ProcessCallable<?> processCallable =
179 (ProcessCallable<?>)objectInputStream.readObject();
180
181 String logPrefixString =
182 StringPool.OPEN_BRACKET.concat(
183 processCallable.toString()).concat(
184 StringPool.CLOSE_BRACKET);
185
186 byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
187
188 outProcessOutputStream.setLogPrefix(logPrefix);
189 errProcessOutputStream.setLogPrefix(logPrefix);
190
191 Serializable result = processCallable.call();
192
193 System.out.flush();
194
195 outProcessOutputStream.writeProcessCallable(
196 new ReturnProcessCallable<Serializable>(result));
197
198 outProcessOutputStream.flush();
199 }
200 catch (ProcessException pe) {
201 errPrintStream.flush();
202
203 errProcessOutputStream.writeProcessCallable(
204 new ExceptionProcessCallable(pe));
205
206 errProcessOutputStream.flush();
207 }
208 }
209
210 public void destroy() {
211 if (_executorService == null) {
212 return;
213 }
214
215 synchronized (ProcessExecutor.class) {
216 if (_executorService != null) {
217 _executorService.shutdownNow();
218
219
220
221
222
223
224
225
226 Iterator<Process> iterator = _managedProcesses.iterator();
227
228 while (iterator.hasNext()) {
229 Process process = iterator.next();
230
231 process.destroy();
232
233 iterator.remove();
234 }
235
236
237
238
239
240
241 _managedProcesses.clear();
242
243 _executorService = null;
244 }
245 }
246 }
247
248 public static class ProcessContext {
249
250 public static boolean attach(
251 String message, long interval, ShutdownHook shutdownHook) {
252
253 HeartbeatThread heartbeatThread = new HeartbeatThread(
254 message, interval, shutdownHook);
255
256 boolean value = _heartbeatThreadReference.compareAndSet(
257 null, heartbeatThread);
258
259 if (value) {
260 heartbeatThread.start();
261 }
262
263 return value;
264 }
265
266 public static void detach() throws InterruptedException {
267 HeartbeatThread heartbeatThread =
268 _heartbeatThreadReference.getAndSet(null);
269
270 if (heartbeatThread != null) {
271 heartbeatThread.detach();
272 heartbeatThread.join();
273 }
274 }
275
276 public static ConcurrentMap<String, Object> getAttributes() {
277 return _attributes;
278 }
279
280 public static ProcessOutputStream getProcessOutputStream() {
281 return _processOutputStream;
282 }
283
284 public static boolean isAttached() {
285 HeartbeatThread attachThread = _heartbeatThreadReference.get();
286
287 if (attachThread != null) {
288 return true;
289 }
290 else {
291 return false;
292 }
293 }
294
295 private static void _setProcessOutputStream(
296 ProcessOutputStream processOutputStream) {
297
298 _processOutputStream = processOutputStream;
299 }
300
301 private ProcessContext() {
302 }
303
304 private static ConcurrentMap<String, Object> _attributes =
305 new ConcurrentHashMap<String, Object>();
306 private static AtomicReference<HeartbeatThread>
307 _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
308 private static ProcessOutputStream _processOutputStream;
309
310 }
311
312 public static interface ShutdownHook {
313
314 public static final int BROKEN_PIPE_CODE = 1;
315
316 public static final int INTERRUPTION_CODE = 2;
317
318 public static final int UNKNOWN_CODE = 3;
319
320 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
321
322 }
323
324 private static ExecutorService _getExecutorService() {
325 if (_executorService != null) {
326 return _executorService;
327 }
328
329 synchronized (ProcessExecutor.class) {
330 if (_executorService == null) {
331 _executorService = Executors.newCachedThreadPool(
332 new NamedThreadFactory(
333 ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
334 PortalClassLoaderUtil.getClassLoader()));
335 }
336 }
337
338 return _executorService;
339 }
340
341 private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
342
343 private static volatile ExecutorService _executorService;
344 private static Set<Process> _managedProcesses =
345 new ConcurrentHashSet<Process>();
346
347 private static class HeartbeatThread extends Thread {
348
349 public HeartbeatThread(
350 String message, long interval, ShutdownHook shutdownHook) {
351
352 if (shutdownHook == null) {
353 throw new IllegalArgumentException("Shutdown hook is null");
354 }
355
356 _interval = interval;
357 _shutdownHook = shutdownHook;
358
359 _pringBackProcessCallable = new PingbackProcessCallable(message);
360
361 setDaemon(true);
362 setName(HeartbeatThread.class.getSimpleName());
363 }
364
365 public void detach() {
366 _detach = true;
367
368 interrupt();
369 }
370
371 @Override
372 public void run() {
373 ProcessOutputStream processOutputStream =
374 ProcessContext.getProcessOutputStream();
375
376 int shutdownCode = 0;
377 Throwable shutdownThrowable = null;
378
379 while (!_detach) {
380 try {
381 sleep(_interval);
382
383 processOutputStream.writeProcessCallable(
384 _pringBackProcessCallable);
385 }
386 catch (InterruptedException ie) {
387 if (_detach) {
388 return;
389 }
390 else {
391 shutdownThrowable = ie;
392
393 shutdownCode = ShutdownHook.INTERRUPTION_CODE;
394 }
395 }
396 catch (IOException ioe) {
397 shutdownThrowable = ioe;
398
399 shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
400 }
401 catch (Throwable throwable) {
402 shutdownThrowable = throwable;
403
404 shutdownCode = ShutdownHook.UNKNOWN_CODE;
405 }
406
407 if (shutdownCode != 0) {
408 _detach = _shutdownHook.shutdown(
409 shutdownCode, shutdownThrowable);
410 }
411 }
412 }
413
414 private volatile boolean _detach;
415 private final long _interval;
416 private final ProcessCallable<String> _pringBackProcessCallable;
417 private final ShutdownHook _shutdownHook;
418
419 }
420
421 private static class PingbackProcessCallable
422 implements ProcessCallable<String> {
423
424 public PingbackProcessCallable(String message) {
425 _message = message;
426 }
427
428 public String call() {
429 return _message;
430 }
431
432 private final String _message;
433
434 }
435
436 private static class ProcessExecutionFutureResult<T> implements Future<T> {
437
438 public ProcessExecutionFutureResult(
439 Future<ProcessCallable<? extends Serializable>> future,
440 Process process) {
441
442 _future = future;
443 _process = process;
444 }
445
446 public boolean cancel(boolean mayInterruptIfRunning) {
447 if (_future.isCancelled() || _future.isDone()) {
448 return false;
449 }
450
451 _future.cancel(true);
452 _process.destroy();
453
454 return true;
455 }
456
457 public boolean isCancelled() {
458 return _future.isCancelled();
459 }
460
461 public boolean isDone() {
462 return _future.isDone();
463 }
464
465 public T get() throws ExecutionException, InterruptedException {
466 ProcessCallable<?> processCallable = _future.get();
467
468 return get(processCallable);
469 }
470
471 public T get(long timeout, TimeUnit timeUnit)
472 throws ExecutionException, InterruptedException, TimeoutException {
473
474 ProcessCallable<?> processCallable = _future.get(timeout, timeUnit);
475
476 return get(processCallable);
477 }
478
479 private T get(ProcessCallable<?> processCallable)
480 throws ExecutionException {
481
482 try {
483 if (processCallable instanceof ReturnProcessCallable<?>) {
484 return (T)processCallable.call();
485 }
486 else {
487 ExceptionProcessCallable exceptionProcessCallable =
488 (ExceptionProcessCallable)processCallable;
489
490 throw exceptionProcessCallable.call();
491 }
492 }
493 catch (ProcessException pe) {
494 throw new ExecutionException(pe);
495 }
496 }
497
498 private final Future<ProcessCallable<?>> _future;
499 private final Process _process;
500
501 }
502
503 private static class SubprocessReactor
504 implements Callable<ProcessCallable<? extends Serializable>> {
505
506 public SubprocessReactor(Process process) {
507 _process = process;
508 }
509
510 public ProcessCallable<? extends Serializable> call() throws Exception {
511 ProcessCallable<?> resultProcessCallable = null;
512
513 UnsyncBufferedInputStream unsyncBufferedInputStream =
514 new UnsyncBufferedInputStream(_process.getInputStream());
515
516 try {
517 ObjectInputStream objectInputStream = null;
518
519 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
520 new UnsyncByteArrayOutputStream();
521
522 while (true) {
523 try {
524
525
526
527 unsyncBufferedInputStream.mark(4);
528
529 objectInputStream =
530 new PortalClassLoaderObjectInputStream(
531 unsyncBufferedInputStream);
532
533
534
535
536 if (unsyncByteArrayOutputStream.size() > 0) {
537 if (_log.isWarnEnabled()) {
538 _log.warn(
539 "Found corrupt leading log " +
540 unsyncByteArrayOutputStream.toString());
541 }
542 }
543
544 unsyncByteArrayOutputStream = null;
545
546 break;
547 }
548 catch (StreamCorruptedException sce) {
549
550
551
552 unsyncBufferedInputStream.reset();
553
554 unsyncByteArrayOutputStream.write(
555 unsyncBufferedInputStream.read());
556 }
557 }
558
559 while (true) {
560 ProcessCallable<?> processCallable =
561 (ProcessCallable<?>)objectInputStream.readObject();
562
563 if ((processCallable instanceof ExceptionProcessCallable) ||
564 (processCallable instanceof ReturnProcessCallable<?>)) {
565
566 resultProcessCallable = processCallable;
567
568 continue;
569 }
570
571 Serializable returnValue = processCallable.call();
572
573 if (_log.isDebugEnabled()) {
574 _log.debug(
575 "Invoked generic process callable " +
576 processCallable + " with return value " +
577 returnValue);
578 }
579 }
580 }
581 catch (StreamCorruptedException sce) {
582 File file = File.createTempFile(
583 "corrupted-stream-dump-" + System.currentTimeMillis(),
584 ".log");
585
586 _log.error(
587 "Dumping content of corrupted object input stream to " +
588 file.getAbsolutePath(),
589 sce);
590
591 FileOutputStream fileOutputStream = new FileOutputStream(file);
592
593 StreamUtil.transfer(
594 unsyncBufferedInputStream, fileOutputStream);
595
596 throw new ProcessException(
597 "Corrupted object input stream", sce);
598 }
599 catch (EOFException eofe) {
600 throw new ProcessException(
601 "Subprocess piping back ended prematurely", eofe);
602 }
603 finally {
604 try {
605 int exitCode = _process.waitFor();
606
607 if (exitCode != 0) {
608 throw new ProcessException(
609 "Subprocess terminated with exit code " + exitCode);
610 }
611 }
612 catch (InterruptedException ie) {
613 _process.destroy();
614
615 throw new ProcessException(
616 "Forcibly killed subprocess on interruption", ie);
617 }
618
619 _managedProcesses.remove(_process);
620
621 if (resultProcessCallable != null) {
622
623
624
625 return resultProcessCallable;
626 }
627 }
628 }
629
630 private final Process _process;
631
632 }
633
634 }