001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.concurrent.AbortPolicy;
018 import com.liferay.portal.kernel.concurrent.AsyncBroker;
019 import com.liferay.portal.kernel.concurrent.FutureListener;
020 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.process.ProcessCallable;
029 import com.liferay.portal.kernel.process.ProcessChannel;
030 import com.liferay.portal.kernel.process.ProcessConfig;
031 import com.liferay.portal.kernel.process.ProcessException;
032 import com.liferay.portal.kernel.process.ProcessExecutor;
033 import com.liferay.portal.kernel.process.TerminationProcessException;
034 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035 import com.liferay.portal.kernel.util.NamedThreadFactory;
036 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037 import com.liferay.portal.kernel.util.StreamUtil;
038
039 import java.io.EOFException;
040 import java.io.File;
041 import java.io.FileOutputStream;
042 import java.io.IOException;
043 import java.io.ObjectInputStream;
044 import java.io.ObjectOutputStream;
045 import java.io.Serializable;
046 import java.io.StreamCorruptedException;
047 import java.io.WriteAbortedException;
048
049 import java.util.ArrayList;
050 import java.util.Collections;
051 import java.util.HashSet;
052 import java.util.Iterator;
053 import java.util.List;
054 import java.util.Map;
055 import java.util.Map.Entry;
056 import java.util.Set;
057 import java.util.concurrent.Callable;
058 import java.util.concurrent.ConcurrentHashMap;
059 import java.util.concurrent.Future;
060 import java.util.concurrent.RejectedExecutionException;
061 import java.util.concurrent.TimeUnit;
062
063
066 public class LocalProcessExecutor implements ProcessExecutor {
067
068 public Set<Process> destroy() {
069 if (_threadPoolExecutor == null) {
070 return Collections.emptySet();
071 }
072
073 Set<Process> processes = Collections.emptySet();
074
075 synchronized (this) {
076 if (_threadPoolExecutor != null) {
077 processes = new HashSet<>();
078
079 _threadPoolExecutor.shutdownNow();
080
081
082
083
084
085
086
087
088 Set<Entry<Process, NoticeableFuture<?>>> set =
089 _managedProcesses.entrySet();
090
091 Iterator<Entry<Process, NoticeableFuture<?>>> iterator =
092 set.iterator();
093
094 while (iterator.hasNext()) {
095 Entry<Process, NoticeableFuture<?>> entry = iterator.next();
096
097 processes.add(entry.getKey());
098
099 NoticeableFuture<?> noticeableFuture = entry.getValue();
100
101 noticeableFuture.cancel(true);
102
103 iterator.remove();
104 }
105
106
107
108
109
110
111 _managedProcesses.clear();
112
113 _threadPoolExecutor = null;
114 }
115 }
116
117
118
119
120
121
122
123
124 return processes;
125 }
126
127 @Override
128 public <T extends Serializable> ProcessChannel<T> execute(
129 ProcessConfig processConfig, ProcessCallable<T> processCallable)
130 throws ProcessException {
131
132 try {
133 List<String> arguments = processConfig.getArguments();
134
135 List<String> commands = new ArrayList<>(arguments.size() + 4);
136
137 commands.add(processConfig.getJavaExecutable());
138 commands.add("-cp");
139 commands.add(processConfig.getBootstrapClassPath());
140 commands.addAll(arguments);
141 commands.add(LocalProcessLauncher.class.getName());
142
143 ProcessBuilder processBuilder = new ProcessBuilder(commands);
144
145 final Process process = processBuilder.start();
146
147 ObjectOutputStream bootstrapObjectOutputStream =
148 new ObjectOutputStream(process.getOutputStream());
149
150 bootstrapObjectOutputStream.writeObject(processCallable.toString());
151 bootstrapObjectOutputStream.writeObject(
152 processConfig.getRuntimeClassPath());
153
154 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
155 bootstrapObjectOutputStream);
156
157 objectOutputStream.writeObject(processCallable);
158
159 objectOutputStream.flush();
160
161 ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
162
163 AsyncBroker<Long, Serializable> asyncBroker = new AsyncBroker<>();
164
165 SubprocessReactor subprocessReactor = new SubprocessReactor(
166 process, processConfig.getReactClassLoader(), asyncBroker);
167
168 try {
169 NoticeableFuture<ProcessCallable<? extends Serializable>>
170 processCallableNoticeableFuture = threadPoolExecutor.submit(
171 subprocessReactor);
172
173 processCallableNoticeableFuture.addFutureListener(
174 new FutureListener
175 <ProcessCallable<? extends Serializable>>() {
176
177 @Override
178 public void complete(
179 Future<ProcessCallable<? extends Serializable>>
180 future) {
181
182 if (future.isCancelled()) {
183 process.destroy();
184 }
185 }
186
187 });
188
189
190
191
192 _managedProcesses.put(process, processCallableNoticeableFuture);
193
194 NoticeableFuture<T> noticeableFuture =
195 new NoticeableFutureConverter
196 <T, ProcessCallable<? extends Serializable>>(
197 processCallableNoticeableFuture) {
198
199 @Override
200 protected T convert(
201 ProcessCallable<? extends Serializable>
202 processCallable)
203 throws ProcessException {
204
205 if (processCallable instanceof
206 ReturnProcessCallable<?>) {
207
208 return (T)processCallable.call();
209 }
210
211 ExceptionProcessCallable exceptionProcessCallable =
212 (ExceptionProcessCallable)processCallable;
213
214 throw exceptionProcessCallable.call();
215 }
216
217 };
218
219 return new LocalProcessChannel<>(
220 noticeableFuture, objectOutputStream, asyncBroker);
221 }
222 catch (RejectedExecutionException ree) {
223 process.destroy();
224
225 throw new ProcessException(
226 "Cancelled execution because of a concurrent destroy", ree);
227 }
228 }
229 catch (IOException ioe) {
230 throw new ProcessException(ioe);
231 }
232 }
233
234 private ThreadPoolExecutor _getThreadPoolExecutor() {
235 if (_threadPoolExecutor != null) {
236 return _threadPoolExecutor;
237 }
238
239 synchronized (this) {
240 if (_threadPoolExecutor == null) {
241 _threadPoolExecutor = new ThreadPoolExecutor(
242 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
243 Integer.MAX_VALUE, new AbortPolicy(),
244 new NamedThreadFactory(
245 LocalProcessExecutor.class.getName(),
246 Thread.MIN_PRIORITY,
247 PortalClassLoaderUtil.getClassLoader()),
248 new ThreadPoolHandlerAdapter());
249 }
250 }
251
252 return _threadPoolExecutor;
253 }
254
255 private static final Log _log = LogFactoryUtil.getLog(
256 LocalProcessExecutor.class);
257
258 private final Map<Process, NoticeableFuture<?>> _managedProcesses =
259 new ConcurrentHashMap<>();
260 private volatile ThreadPoolExecutor _threadPoolExecutor;
261
262 private class SubprocessReactor
263 implements Callable<ProcessCallable<? extends Serializable>> {
264
265 public SubprocessReactor(
266 Process process, ClassLoader reactClassLoader,
267 AsyncBroker<Long, Serializable> asyncBroker) {
268
269 _process = process;
270 _reactClassLoader = reactClassLoader;
271 _asyncBroker = asyncBroker;
272 }
273
274 @Override
275 public ProcessCallable<? extends Serializable> call() throws Exception {
276 ProcessCallable<?> resultProcessCallable = null;
277
278 AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
279
280 UnsyncBufferedInputStream unsyncBufferedInputStream =
281 new UnsyncBufferedInputStream(_process.getInputStream());
282
283 try {
284 ObjectInputStream objectInputStream = null;
285
286 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
287 new UnsyncByteArrayOutputStream();
288
289 while (true) {
290 try {
291
292
293
294 unsyncBufferedInputStream.mark(4);
295
296 objectInputStream = new ClassLoaderObjectInputStream(
297 unsyncBufferedInputStream, _reactClassLoader);
298
299
300
301
302 if (unsyncByteArrayOutputStream.size() > 0) {
303 if (_log.isWarnEnabled()) {
304 _log.warn(
305 "Found corrupt leading log " +
306 unsyncByteArrayOutputStream.toString());
307 }
308 }
309
310 unsyncByteArrayOutputStream = null;
311
312 break;
313 }
314 catch (StreamCorruptedException sce) {
315
316
317
318 unsyncBufferedInputStream.reset();
319
320 unsyncByteArrayOutputStream.write(
321 unsyncBufferedInputStream.read());
322 }
323 }
324
325 while (true) {
326 Object obj = null;
327
328 try {
329 obj = objectInputStream.readObject();
330 }
331 catch (WriteAbortedException wae) {
332 if (_log.isWarnEnabled()) {
333 _log.warn("Caught a write aborted exception", wae);
334 }
335
336 continue;
337 }
338
339 if (!(obj instanceof ProcessCallable)) {
340 if (_log.isInfoEnabled()) {
341 _log.info(
342 "Received a nonprocess callable piping back " +
343 obj);
344 }
345
346 continue;
347 }
348
349 ProcessCallable<?> processCallable =
350 (ProcessCallable<?>)obj;
351
352 if ((processCallable instanceof ExceptionProcessCallable) ||
353 (processCallable instanceof ReturnProcessCallable<?>)) {
354
355 resultProcessCallable = processCallable;
356
357 continue;
358 }
359
360 try {
361 Serializable returnValue = processCallable.call();
362
363 if (_log.isDebugEnabled()) {
364 _log.debug(
365 "Invoked generic process callable " +
366 processCallable + " with return value " +
367 returnValue);
368 }
369 }
370 catch (Throwable t) {
371 _log.error(
372 "Unable to invoke generic process callable", t);
373 }
374 }
375 }
376 catch (StreamCorruptedException sce) {
377 File file = File.createTempFile(
378 "corrupted-stream-dump-" + System.currentTimeMillis(),
379 ".log");
380
381 _log.error(
382 "Dumping content of corrupted object input stream to " +
383 file.getAbsolutePath(),
384 sce);
385
386 FileOutputStream fileOutputStream = new FileOutputStream(file);
387
388 StreamUtil.transfer(
389 unsyncBufferedInputStream, fileOutputStream);
390
391 throw new ProcessException(
392 "Corrupted object input stream", sce);
393 }
394 catch (EOFException eofe) {
395 throw new ProcessException(
396 "Subprocess piping back ended prematurely", eofe);
397 }
398 catch (Throwable t) {
399 _log.error("Abort subprocess piping", t);
400
401 throw t;
402 }
403 finally {
404 try {
405 int exitCode = _process.waitFor();
406
407 if (exitCode != 0) {
408 throw new TerminationProcessException(exitCode);
409 }
410 }
411 catch (InterruptedException ie) {
412 _process.destroy();
413
414 throw new ProcessException(
415 "Forcibly killed subprocess on interruption", ie);
416 }
417
418 _managedProcesses.remove(_process);
419
420 if (resultProcessCallable != null) {
421
422
423
424 return resultProcessCallable;
425 }
426
427 AsyncBrokerThreadLocal.removeAsyncBroker();
428 }
429 }
430
431 private final AsyncBroker<Long, Serializable> _asyncBroker;
432 private final Process _process;
433 private final ClassLoader _reactClassLoader;
434
435 }
436
437 }