001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
019 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
020 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025 import com.liferay.portal.kernel.util.NamedThreadFactory;
026 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027 import com.liferay.portal.kernel.util.StreamUtil;
028 import com.liferay.portal.kernel.util.StringPool;
029
030 import java.io.EOFException;
031 import java.io.File;
032 import java.io.FileDescriptor;
033 import java.io.FileOutputStream;
034 import java.io.IOException;
035 import java.io.ObjectInputStream;
036 import java.io.ObjectOutputStream;
037 import java.io.PrintStream;
038 import java.io.Serializable;
039 import java.io.StreamCorruptedException;
040
041 import java.util.ArrayList;
042 import java.util.Collections;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Set;
046 import java.util.concurrent.Callable;
047 import java.util.concurrent.ConcurrentHashMap;
048 import java.util.concurrent.ConcurrentMap;
049 import java.util.concurrent.ExecutionException;
050 import java.util.concurrent.ExecutorService;
051 import java.util.concurrent.Executors;
052 import java.util.concurrent.Future;
053 import java.util.concurrent.RejectedExecutionException;
054 import java.util.concurrent.TimeUnit;
055 import java.util.concurrent.TimeoutException;
056 import java.util.concurrent.atomic.AtomicReference;
057
058
061 public class ProcessExecutor {
062
063 public static <T extends Serializable> Future<T> execute(
064 String classPath, List<String> arguments,
065 ProcessCallable<? extends Serializable> processCallable)
066 throws ProcessException {
067
068 return execute("java", classPath, arguments, processCallable);
069 }
070
071 public static <T extends Serializable> Future<T> execute(
072 String classPath,
073 ProcessCallable<? extends Serializable> processCallable)
074 throws ProcessException {
075
076 return execute(
077 "java", classPath, Collections.<String>emptyList(),
078 processCallable);
079 }
080
081 public static <T extends Serializable> Future<T> execute(
082 String java, String classPath, List<String> arguments,
083 ProcessCallable<? extends Serializable> processCallable)
084 throws ProcessException {
085
086 try {
087 List<String> commands = new ArrayList<String>(arguments.size() + 4);
088
089 commands.add(java);
090 commands.add("-cp");
091 commands.add(classPath);
092 commands.addAll(arguments);
093 commands.add(ProcessExecutor.class.getName());
094
095 ProcessBuilder processBuilder = new ProcessBuilder(commands);
096
097 Process process = processBuilder.start();
098
099 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
100 process.getOutputStream());
101
102 try {
103 objectOutputStream.writeObject(processCallable);
104 }
105 finally {
106 objectOutputStream.close();
107 }
108
109 ExecutorService executorService = _getExecutorService();
110
111 SubprocessReactor subprocessReactor = new SubprocessReactor(
112 process);
113
114 try {
115 Future<ProcessCallable<? extends Serializable>>
116 futureResponseProcessCallable = executorService.submit(
117 subprocessReactor);
118
119
120
121
122 _managedProcesses.add(process);
123
124 return new ProcessExecutionFutureResult<T>(
125 futureResponseProcessCallable, process);
126 }
127 catch (RejectedExecutionException ree) {
128 process.destroy();
129
130 throw new ProcessException(
131 "Cancelled execution because of a concurrent destroy", ree);
132 }
133 }
134 catch (IOException ioe) {
135 throw new ProcessException(ioe);
136 }
137 }
138
139 public static void main(String[] arguments)
140 throws ClassNotFoundException, IOException {
141
142 PrintStream oldOutPrintStream = System.out;
143
144 ObjectOutputStream objectOutputStream = null;
145 ProcessOutputStream outProcessOutputStream = null;
146
147 synchronized (oldOutPrintStream) {
148 oldOutPrintStream.flush();
149
150 FileOutputStream fileOutputStream = new FileOutputStream(
151 FileDescriptor.out);
152
153 objectOutputStream = new ObjectOutputStream(
154 new UnsyncBufferedOutputStream(fileOutputStream));
155
156 outProcessOutputStream = new ProcessOutputStream(
157 objectOutputStream, false);
158
159 ProcessContext._setProcessOutputStream(outProcessOutputStream);
160
161 PrintStream newOutPrintStream = new PrintStream(
162 outProcessOutputStream, true);
163
164 System.setOut(newOutPrintStream);
165 }
166
167 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
168 objectOutputStream, true);
169
170 PrintStream errPrintStream = new PrintStream(
171 errProcessOutputStream, true);
172
173 System.setErr(errPrintStream);
174
175 try {
176 ObjectInputStream objectInputStream = new ObjectInputStream(
177 System.in);
178
179 ProcessCallable<?> processCallable =
180 (ProcessCallable<?>)objectInputStream.readObject();
181
182 String logPrefixString =
183 StringPool.OPEN_BRACKET.concat(
184 processCallable.toString()).concat(
185 StringPool.CLOSE_BRACKET);
186
187 byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
188
189 outProcessOutputStream.setLogPrefix(logPrefix);
190 errProcessOutputStream.setLogPrefix(logPrefix);
191
192 Serializable result = processCallable.call();
193
194 System.out.flush();
195
196 outProcessOutputStream.writeProcessCallable(
197 new ReturnProcessCallable<Serializable>(result));
198
199 outProcessOutputStream.flush();
200 }
201 catch (ProcessException pe) {
202 errPrintStream.flush();
203
204 errProcessOutputStream.writeProcessCallable(
205 new ExceptionProcessCallable(pe));
206
207 errProcessOutputStream.flush();
208 }
209 }
210
211 public void destroy() {
212 if (_executorService == null) {
213 return;
214 }
215
216 synchronized (ProcessExecutor.class) {
217 if (_executorService != null) {
218 _executorService.shutdownNow();
219
220
221
222
223
224
225
226
227 Iterator<Process> iterator = _managedProcesses.iterator();
228
229 while (iterator.hasNext()) {
230 Process process = iterator.next();
231
232 process.destroy();
233
234 iterator.remove();
235 }
236
237
238
239
240
241
242 _managedProcesses.clear();
243
244 _executorService = null;
245 }
246 }
247 }
248
249 public static class ProcessContext {
250
251 public static boolean attach(
252 String message, long interval, ShutdownHook shutdownHook) {
253
254 HeartbeatThread heartbeatThread = new HeartbeatThread(
255 message, interval, shutdownHook);
256
257 boolean value = _heartbeatThreadReference.compareAndSet(
258 null, heartbeatThread);
259
260 if (value) {
261 heartbeatThread.start();
262 }
263
264 return value;
265 }
266
267 public static void detach() throws InterruptedException {
268 HeartbeatThread heartbeatThread =
269 _heartbeatThreadReference.getAndSet(null);
270
271 if (heartbeatThread != null) {
272 heartbeatThread.detach();
273 heartbeatThread.join();
274 }
275 }
276
277 public static ConcurrentMap<String, Object> getAttributes() {
278 return _attributes;
279 }
280
281 public static ProcessOutputStream getProcessOutputStream() {
282 return _processOutputStream;
283 }
284
285 public static boolean isAttached() {
286 HeartbeatThread attachThread = _heartbeatThreadReference.get();
287
288 if (attachThread != null) {
289 return true;
290 }
291 else {
292 return false;
293 }
294 }
295
296 private static void _setProcessOutputStream(
297 ProcessOutputStream processOutputStream) {
298
299 _processOutputStream = processOutputStream;
300 }
301
302 private ProcessContext() {
303 }
304
305 private static ConcurrentMap<String, Object> _attributes =
306 new ConcurrentHashMap<String, Object>();
307 private static AtomicReference<HeartbeatThread>
308 _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
309 private static ProcessOutputStream _processOutputStream;
310
311 }
312
313 public static interface ShutdownHook {
314
315 public static final int BROKEN_PIPE_CODE = 1;
316
317 public static final int INTERRUPTION_CODE = 2;
318
319 public static final int UNKNOWN_CODE = 3;
320
321 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
322
323 }
324
325 private static ExecutorService _getExecutorService() {
326 if (_executorService != null) {
327 return _executorService;
328 }
329
330 synchronized (ProcessExecutor.class) {
331 if (_executorService == null) {
332 _executorService = Executors.newCachedThreadPool(
333 new NamedThreadFactory(
334 ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
335 PortalClassLoaderUtil.getClassLoader()));
336 }
337 }
338
339 return _executorService;
340 }
341
342 private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
343
344 private static volatile ExecutorService _executorService;
345 private static Set<Process> _managedProcesses =
346 new ConcurrentHashSet<Process>();
347
348 private static class HeartbeatThread extends Thread {
349
350 public HeartbeatThread(
351 String message, long interval, ShutdownHook shutdownHook) {
352
353 if (shutdownHook == null) {
354 throw new IllegalArgumentException("Shutdown hook is null");
355 }
356
357 _interval = interval;
358 _shutdownHook = shutdownHook;
359
360 _pringBackProcessCallable = new PingbackProcessCallable(message);
361
362 setDaemon(true);
363 setName(HeartbeatThread.class.getSimpleName());
364 }
365
366 public void detach() {
367 _detach = true;
368
369 interrupt();
370 }
371
372 @Override
373 public void run() {
374 ProcessOutputStream processOutputStream =
375 ProcessContext.getProcessOutputStream();
376
377 int shutdownCode = 0;
378 Throwable shutdownThrowable = null;
379
380 while (!_detach) {
381 try {
382 sleep(_interval);
383
384 processOutputStream.writeProcessCallable(
385 _pringBackProcessCallable);
386 }
387 catch (InterruptedException ie) {
388 if (_detach) {
389 return;
390 }
391 else {
392 shutdownThrowable = ie;
393
394 shutdownCode = ShutdownHook.INTERRUPTION_CODE;
395 }
396 }
397 catch (IOException ioe) {
398 shutdownThrowable = ioe;
399
400 shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
401 }
402 catch (Throwable throwable) {
403 shutdownThrowable = throwable;
404
405 shutdownCode = ShutdownHook.UNKNOWN_CODE;
406 }
407
408 if (shutdownCode != 0) {
409 _detach = _shutdownHook.shutdown(
410 shutdownCode, shutdownThrowable);
411 }
412 }
413 }
414
415 private volatile boolean _detach;
416 private final long _interval;
417 private final ProcessCallable<String> _pringBackProcessCallable;
418 private final ShutdownHook _shutdownHook;
419
420 }
421
422 private static class PingbackProcessCallable
423 implements ProcessCallable<String> {
424
425 public PingbackProcessCallable(String message) {
426 _message = message;
427 }
428
429 @Override
430 public String call() {
431 return _message;
432 }
433
434 private static final long serialVersionUID = 1L;
435
436 private final String _message;
437
438 }
439
440 private static class ProcessExecutionFutureResult<T> implements Future<T> {
441
442 public ProcessExecutionFutureResult(
443 Future<ProcessCallable<? extends Serializable>> future,
444 Process process) {
445
446 _future = future;
447 _process = process;
448 }
449
450 @Override
451 public boolean cancel(boolean mayInterruptIfRunning) {
452 if (_future.isCancelled() || _future.isDone()) {
453 return false;
454 }
455
456 _future.cancel(true);
457 _process.destroy();
458
459 return true;
460 }
461
462 @Override
463 public boolean isCancelled() {
464 return _future.isCancelled();
465 }
466
467 @Override
468 public boolean isDone() {
469 return _future.isDone();
470 }
471
472 @Override
473 public T get() throws ExecutionException, InterruptedException {
474 ProcessCallable<?> processCallable = _future.get();
475
476 return get(processCallable);
477 }
478
479 @Override
480 public T get(long timeout, TimeUnit timeUnit)
481 throws ExecutionException, InterruptedException, TimeoutException {
482
483 ProcessCallable<?> processCallable = _future.get(timeout, timeUnit);
484
485 return get(processCallable);
486 }
487
488 private T get(ProcessCallable<?> processCallable)
489 throws ExecutionException {
490
491 try {
492 if (processCallable instanceof ReturnProcessCallable<?>) {
493 return (T)processCallable.call();
494 }
495 else {
496 ExceptionProcessCallable exceptionProcessCallable =
497 (ExceptionProcessCallable)processCallable;
498
499 throw exceptionProcessCallable.call();
500 }
501 }
502 catch (ProcessException pe) {
503 throw new ExecutionException(pe);
504 }
505 }
506
507 private final Future<ProcessCallable<?>> _future;
508 private final Process _process;
509
510 }
511
512 private static class SubprocessReactor
513 implements Callable<ProcessCallable<? extends Serializable>> {
514
515 public SubprocessReactor(Process process) {
516 _process = process;
517 }
518
519 @Override
520 public ProcessCallable<? extends Serializable> call() throws Exception {
521 ProcessCallable<?> resultProcessCallable = null;
522
523 UnsyncBufferedInputStream unsyncBufferedInputStream =
524 new UnsyncBufferedInputStream(_process.getInputStream());
525
526 try {
527 ObjectInputStream objectInputStream = null;
528
529 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
530 new UnsyncByteArrayOutputStream();
531
532 while (true) {
533 try {
534
535
536
537 unsyncBufferedInputStream.mark(4);
538
539 objectInputStream = new ClassLoaderObjectInputStream(
540 unsyncBufferedInputStream,
541 PortalClassLoaderUtil.getClassLoader());
542
543
544
545
546 if (unsyncByteArrayOutputStream.size() > 0) {
547 if (_log.isWarnEnabled()) {
548 _log.warn(
549 "Found corrupt leading log " +
550 unsyncByteArrayOutputStream.toString());
551 }
552 }
553
554 unsyncByteArrayOutputStream = null;
555
556 break;
557 }
558 catch (StreamCorruptedException sce) {
559
560
561
562 unsyncBufferedInputStream.reset();
563
564 unsyncByteArrayOutputStream.write(
565 unsyncBufferedInputStream.read());
566 }
567 }
568
569 while (true) {
570 ProcessCallable<?> processCallable =
571 (ProcessCallable<?>)objectInputStream.readObject();
572
573 if ((processCallable instanceof ExceptionProcessCallable) ||
574 (processCallable instanceof ReturnProcessCallable<?>)) {
575
576 resultProcessCallable = processCallable;
577
578 continue;
579 }
580
581 Serializable returnValue = processCallable.call();
582
583 if (_log.isDebugEnabled()) {
584 _log.debug(
585 "Invoked generic process callable " +
586 processCallable + " with return value " +
587 returnValue);
588 }
589 }
590 }
591 catch (StreamCorruptedException sce) {
592 File file = File.createTempFile(
593 "corrupted-stream-dump-" + System.currentTimeMillis(),
594 ".log");
595
596 _log.error(
597 "Dumping content of corrupted object input stream to " +
598 file.getAbsolutePath(),
599 sce);
600
601 FileOutputStream fileOutputStream = new FileOutputStream(file);
602
603 StreamUtil.transfer(
604 unsyncBufferedInputStream, fileOutputStream);
605
606 throw new ProcessException(
607 "Corrupted object input stream", sce);
608 }
609 catch (EOFException eofe) {
610 throw new ProcessException(
611 "Subprocess piping back ended prematurely", eofe);
612 }
613 finally {
614 try {
615 int exitCode = _process.waitFor();
616
617 if (exitCode != 0) {
618 throw new ProcessException(
619 "Subprocess terminated with exit code " + exitCode);
620 }
621 }
622 catch (InterruptedException ie) {
623 _process.destroy();
624
625 throw new ProcessException(
626 "Forcibly killed subprocess on interruption", ie);
627 }
628
629 _managedProcesses.remove(_process);
630
631 if (resultProcessCallable != null) {
632
633
634
635 return resultProcessCallable;
636 }
637 }
638 }
639
640 private final Process _process;
641
642 }
643
644 }