001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.concurrent.AbortPolicy;
018 import com.liferay.portal.kernel.concurrent.AsyncBroker;
019 import com.liferay.portal.kernel.concurrent.FutureListener;
020 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.process.ProcessCallable;
029 import com.liferay.portal.kernel.process.ProcessChannel;
030 import com.liferay.portal.kernel.process.ProcessConfig;
031 import com.liferay.portal.kernel.process.ProcessException;
032 import com.liferay.portal.kernel.process.ProcessExecutor;
033 import com.liferay.portal.kernel.process.TerminationProcessException;
034 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035 import com.liferay.portal.kernel.util.NamedThreadFactory;
036 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037 import com.liferay.portal.kernel.util.StreamUtil;
038
039 import java.io.EOFException;
040 import java.io.File;
041 import java.io.FileOutputStream;
042 import java.io.IOException;
043 import java.io.ObjectInputStream;
044 import java.io.ObjectOutputStream;
045 import java.io.Serializable;
046 import java.io.StreamCorruptedException;
047 import java.io.WriteAbortedException;
048
049 import java.util.ArrayList;
050 import java.util.Collections;
051 import java.util.HashSet;
052 import java.util.Iterator;
053 import java.util.List;
054 import java.util.Map;
055 import java.util.Map.Entry;
056 import java.util.Set;
057 import java.util.concurrent.Callable;
058 import java.util.concurrent.ConcurrentHashMap;
059 import java.util.concurrent.Future;
060 import java.util.concurrent.RejectedExecutionException;
061 import java.util.concurrent.TimeUnit;
062
063
066 public class LocalProcessExecutor implements ProcessExecutor {
067
068 public Set<Process> destroy() {
069 if (_threadPoolExecutor == null) {
070 return Collections.emptySet();
071 }
072
073 Set<Process> processes = Collections.emptySet();
074
075 synchronized (this) {
076 if (_threadPoolExecutor != null) {
077 processes = new HashSet<>();
078
079 _threadPoolExecutor.shutdownNow();
080
081
082
083
084
085
086
087
088 Set<Entry<Process, NoticeableFuture<?>>> set =
089 _managedProcesses.entrySet();
090
091 Iterator<Entry<Process, NoticeableFuture<?>>> iterator =
092 set.iterator();
093
094 while (iterator.hasNext()) {
095 Entry<Process, NoticeableFuture<?>> entry = iterator.next();
096
097 processes.add(entry.getKey());
098
099 NoticeableFuture<?> noticeableFuture = entry.getValue();
100
101 noticeableFuture.cancel(true);
102
103 iterator.remove();
104 }
105
106
107
108
109
110
111 _managedProcesses.clear();
112
113 _threadPoolExecutor = null;
114 }
115 }
116
117
118
119
120
121
122
123
124 return processes;
125 }
126
127 @Override
128 public <T extends Serializable> ProcessChannel<T> execute(
129 ProcessConfig processConfig, ProcessCallable<T> processCallable)
130 throws ProcessException {
131
132 try {
133 List<String> arguments = processConfig.getArguments();
134
135 List<String> commands = new ArrayList<>(arguments.size() + 4);
136
137 commands.add(processConfig.getJavaExecutable());
138 commands.add("-cp");
139 commands.add(processConfig.getBootstrapClassPath());
140 commands.addAll(arguments);
141 commands.add(LocalProcessLauncher.class.getName());
142
143 ProcessBuilder processBuilder = new ProcessBuilder(commands);
144
145 final Process process = processBuilder.start();
146
147 ObjectOutputStream bootstrapObjectOutputStream =
148 new ObjectOutputStream(process.getOutputStream());
149
150 bootstrapObjectOutputStream.writeObject(processCallable.toString());
151 bootstrapObjectOutputStream.writeObject(
152 processConfig.getRuntimeClassPath());
153
154 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
155 bootstrapObjectOutputStream);
156
157 objectOutputStream.writeObject(processCallable);
158
159 objectOutputStream.flush();
160
161 ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
162
163 AsyncBroker<Long, Serializable> asyncBroker = new AsyncBroker<>();
164
165 SubprocessReactor subprocessReactor = new SubprocessReactor(
166 process, processConfig.getReactClassLoader(), asyncBroker);
167
168 try {
169 NoticeableFuture<ProcessCallable<? extends Serializable>>
170 processCallableNoticeableFuture = threadPoolExecutor.submit(
171 subprocessReactor);
172
173 processCallableNoticeableFuture.addFutureListener(
174 new FutureListener
175 <ProcessCallable<? extends Serializable>>() {
176
177 @Override
178 public void complete(
179 Future<ProcessCallable<? extends Serializable>>
180 future) {
181
182 if (future.isCancelled()) {
183 process.destroy();
184 }
185 }
186
187 });
188
189
190
191
192 _managedProcesses.put(process, processCallableNoticeableFuture);
193
194 NoticeableFuture<T> noticeableFuture =
195 new NoticeableFutureConverter
196 <T, ProcessCallable<? extends Serializable>>(
197 processCallableNoticeableFuture) {
198
199 @Override
200 protected T convert(
201 ProcessCallable<? extends Serializable>
202 processCallable)
203 throws ProcessException {
204
205 if (processCallable instanceof
206 ReturnProcessCallable<?>) {
207
208 return (T)processCallable.call();
209 }
210
211 ExceptionProcessCallable
212 exceptionProcessCallable =
213 (ExceptionProcessCallable)
214 processCallable;
215
216 throw exceptionProcessCallable.call();
217 }
218
219 };
220
221 return new LocalProcessChannel<>(
222 noticeableFuture, objectOutputStream, asyncBroker);
223 }
224 catch (RejectedExecutionException ree) {
225 process.destroy();
226
227 throw new ProcessException(
228 "Cancelled execution because of a concurrent destroy", ree);
229 }
230 }
231 catch (IOException ioe) {
232 throw new ProcessException(ioe);
233 }
234 }
235
236 private ThreadPoolExecutor _getThreadPoolExecutor() {
237 if (_threadPoolExecutor != null) {
238 return _threadPoolExecutor;
239 }
240
241 synchronized (this) {
242 if (_threadPoolExecutor == null) {
243 _threadPoolExecutor = new ThreadPoolExecutor(
244 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
245 Integer.MAX_VALUE, new AbortPolicy(),
246 new NamedThreadFactory(
247 LocalProcessExecutor.class.getName(),
248 Thread.MIN_PRIORITY,
249 PortalClassLoaderUtil.getClassLoader()),
250 new ThreadPoolHandlerAdapter());
251 }
252 }
253
254 return _threadPoolExecutor;
255 }
256
257 private static final Log _log = LogFactoryUtil.getLog(
258 LocalProcessExecutor.class);
259
260 private final Map<Process, NoticeableFuture<?>> _managedProcesses =
261 new ConcurrentHashMap<>();
262 private volatile ThreadPoolExecutor _threadPoolExecutor;
263
264 private class SubprocessReactor
265 implements Callable<ProcessCallable<? extends Serializable>> {
266
267 public SubprocessReactor(
268 Process process, ClassLoader reactClassLoader,
269 AsyncBroker<Long, Serializable> asyncBroker) {
270
271 _process = process;
272 _reactClassLoader = reactClassLoader;
273 _asyncBroker = asyncBroker;
274 }
275
276 @Override
277 public ProcessCallable<? extends Serializable> call() throws Exception {
278 ProcessCallable<?> resultProcessCallable = null;
279
280 AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
281
282 UnsyncBufferedInputStream unsyncBufferedInputStream =
283 new UnsyncBufferedInputStream(_process.getInputStream());
284
285 try {
286 ObjectInputStream objectInputStream = null;
287
288 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
289 new UnsyncByteArrayOutputStream();
290
291 while (true) {
292 try {
293
294
295
296 unsyncBufferedInputStream.mark(4);
297
298 objectInputStream = new ClassLoaderObjectInputStream(
299 unsyncBufferedInputStream, _reactClassLoader);
300
301
302
303
304 if (unsyncByteArrayOutputStream.size() > 0) {
305 if (_log.isWarnEnabled()) {
306 _log.warn(
307 "Found corrupt leading log " +
308 unsyncByteArrayOutputStream.toString());
309 }
310 }
311
312 unsyncByteArrayOutputStream = null;
313
314 break;
315 }
316 catch (StreamCorruptedException sce) {
317
318
319
320 unsyncBufferedInputStream.reset();
321
322 unsyncByteArrayOutputStream.write(
323 unsyncBufferedInputStream.read());
324 }
325 }
326
327 while (true) {
328 Object obj = null;
329
330 try {
331 obj = objectInputStream.readObject();
332 }
333 catch (WriteAbortedException wae) {
334 if (_log.isWarnEnabled()) {
335 _log.warn("Caught a write aborted exception", wae);
336 }
337
338 continue;
339 }
340
341 if (!(obj instanceof ProcessCallable)) {
342 if (_log.isInfoEnabled()) {
343 _log.info(
344 "Received a nonprocess callable piping back " +
345 obj);
346 }
347
348 continue;
349 }
350
351 ProcessCallable<?> processCallable =
352 (ProcessCallable<?>)obj;
353
354 if ((processCallable instanceof ExceptionProcessCallable) ||
355 (processCallable instanceof ReturnProcessCallable<?>)) {
356
357 resultProcessCallable = processCallable;
358
359 continue;
360 }
361
362 try {
363 Serializable returnValue = processCallable.call();
364
365 if (_log.isDebugEnabled()) {
366 _log.debug(
367 "Invoked generic process callable " +
368 processCallable + " with return value " +
369 returnValue);
370 }
371 }
372 catch (Throwable t) {
373 _log.error(
374 "Unable to invoke generic process callable", t);
375 }
376 }
377 }
378 catch (StreamCorruptedException sce) {
379 File file = File.createTempFile(
380 "corrupted-stream-dump-" + System.currentTimeMillis(),
381 ".log");
382
383 _log.error(
384 "Dumping content of corrupted object input stream to " +
385 file.getAbsolutePath(),
386 sce);
387
388 FileOutputStream fileOutputStream = new FileOutputStream(file);
389
390 StreamUtil.transfer(
391 unsyncBufferedInputStream, fileOutputStream);
392
393 throw new ProcessException(
394 "Corrupted object input stream", sce);
395 }
396 catch (EOFException eofe) {
397 throw new ProcessException(
398 "Subprocess piping back ended prematurely", eofe);
399 }
400 catch (Throwable t) {
401 _log.error("Abort subprocess piping", t);
402
403 throw t;
404 }
405 finally {
406 try {
407 int exitCode = _process.waitFor();
408
409 if (exitCode != 0) {
410 throw new TerminationProcessException(exitCode);
411 }
412 }
413 catch (InterruptedException ie) {
414 _process.destroy();
415
416 throw new ProcessException(
417 "Forcibly killed subprocess on interruption", ie);
418 }
419
420 _managedProcesses.remove(_process);
421
422 if (resultProcessCallable != null) {
423
424
425
426 return resultProcessCallable;
427 }
428
429 AsyncBrokerThreadLocal.removeAsyncBroker();
430 }
431 }
432
433 private final AsyncBroker<Long, Serializable> _asyncBroker;
434 private final Process _process;
435 private final ClassLoader _reactClassLoader;
436
437 }
438
439 }