001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process.local;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.AsyncBroker;
019    import com.liferay.portal.kernel.concurrent.FutureListener;
020    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.process.ProcessCallable;
029    import com.liferay.portal.kernel.process.ProcessChannel;
030    import com.liferay.portal.kernel.process.ProcessConfig;
031    import com.liferay.portal.kernel.process.ProcessException;
032    import com.liferay.portal.kernel.process.ProcessExecutor;
033    import com.liferay.portal.kernel.process.TerminationProcessException;
034    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035    import com.liferay.portal.kernel.util.NamedThreadFactory;
036    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037    import com.liferay.portal.kernel.util.StreamUtil;
038    
039    import java.io.EOFException;
040    import java.io.File;
041    import java.io.FileOutputStream;
042    import java.io.IOException;
043    import java.io.ObjectInputStream;
044    import java.io.ObjectOutputStream;
045    import java.io.Serializable;
046    import java.io.StreamCorruptedException;
047    import java.io.WriteAbortedException;
048    
049    import java.util.ArrayList;
050    import java.util.Collections;
051    import java.util.HashSet;
052    import java.util.Iterator;
053    import java.util.List;
054    import java.util.Map;
055    import java.util.Map.Entry;
056    import java.util.Set;
057    import java.util.concurrent.Callable;
058    import java.util.concurrent.ConcurrentHashMap;
059    import java.util.concurrent.Future;
060    import java.util.concurrent.RejectedExecutionException;
061    import java.util.concurrent.TimeUnit;
062    
063    /**
064     * @author Shuyang Zhou
065     */
066    public class LocalProcessExecutor implements ProcessExecutor {
067    
068            public Set<Process> destroy() {
069                    if (_threadPoolExecutor == null) {
070                            return Collections.emptySet();
071                    }
072    
073                    Set<Process> processes = Collections.emptySet();
074    
075                    synchronized (this) {
076                            if (_threadPoolExecutor != null) {
077                                    processes = new HashSet<>();
078    
079                                    _threadPoolExecutor.shutdownNow();
080    
081                                    // At this point, the thread pool will no longer take in any
082                                    // more subprocess reactors, so we know the list of managed
083                                    // processes is in a safe state. The worst case is that the
084                                    // destroyer thread and the thread pool thread concurrently
085                                    // destroy the same process, but this is JDK's job to ensure
086                                    // that processes are destroyed in a thread safe manner.
087    
088                                    Set<Entry<Process, NoticeableFuture<?>>> set =
089                                            _managedProcesses.entrySet();
090    
091                                    Iterator<Entry<Process, NoticeableFuture<?>>> iterator =
092                                            set.iterator();
093    
094                                    while (iterator.hasNext()) {
095                                            Entry<Process, NoticeableFuture<?>> entry = iterator.next();
096    
097                                            processes.add(entry.getKey());
098    
099                                            NoticeableFuture<?> noticeableFuture = entry.getValue();
100    
101                                            noticeableFuture.cancel(true);
102    
103                                            iterator.remove();
104                                    }
105    
106                                    // The current thread has a more comprehensive view of the list
107                                    // of managed processes than any thread pool thread. After the
108                                    // previous iteration, we are safe to clear the list of managed
109                                    // processes.
110    
111                                    _managedProcesses.clear();
112    
113                                    _threadPoolExecutor = null;
114                            }
115                    }
116    
117                    // Whip's instrument logic sees a label on a synchronized block exit and
118                    // asks for coverage, but it does not understand that this is actually
119                    // the same as exiting a method. To overcome this limitation, the code
120                    // logic has to explicitly leave the synchronized block before leaving
121                    // the method. This limitation will be removed in a future version of
122                    // Whip.
123    
124                    return processes;
125            }
126    
127            @Override
128            public <T extends Serializable> ProcessChannel<T> execute(
129                            ProcessConfig processConfig, ProcessCallable<T> processCallable)
130                    throws ProcessException {
131    
132                    try {
133                            List<String> arguments = processConfig.getArguments();
134    
135                            List<String> commands = new ArrayList<>(arguments.size() + 4);
136    
137                            commands.add(processConfig.getJavaExecutable());
138                            commands.add("-cp");
139                            commands.add(processConfig.getBootstrapClassPath());
140                            commands.addAll(arguments);
141                            commands.add(LocalProcessLauncher.class.getName());
142    
143                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
144    
145                            final Process process = processBuilder.start();
146    
147                            ObjectOutputStream bootstrapObjectOutputStream =
148                                    new ObjectOutputStream(process.getOutputStream());
149    
150                            bootstrapObjectOutputStream.writeObject(processCallable.toString());
151                            bootstrapObjectOutputStream.writeObject(
152                                    processConfig.getRuntimeClassPath());
153    
154                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
155                                    bootstrapObjectOutputStream);
156    
157                            objectOutputStream.writeObject(processCallable);
158    
159                            objectOutputStream.flush();
160    
161                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
162    
163                            AsyncBroker<Long, Serializable> asyncBroker = new AsyncBroker<>();
164    
165                            SubprocessReactor subprocessReactor = new SubprocessReactor(
166                                    process, processConfig.getReactClassLoader(), asyncBroker);
167    
168                            try {
169                                    NoticeableFuture<ProcessCallable<? extends Serializable>>
170                                            processCallableNoticeableFuture = threadPoolExecutor.submit(
171                                                    subprocessReactor);
172    
173                                    processCallableNoticeableFuture.addFutureListener(
174                                            new FutureListener
175                                                    <ProcessCallable<? extends Serializable>>() {
176    
177                                                    @Override
178                                                    public void complete(
179                                                            Future<ProcessCallable<? extends Serializable>>
180                                                                    future) {
181    
182                                                            if (future.isCancelled()) {
183                                                                    process.destroy();
184                                                            }
185                                                    }
186    
187                                            });
188    
189                                    // Consider the newly created process as a managed process only
190                                    // after the subprocess reactor is taken by the thread pool
191    
192                                    _managedProcesses.put(process, processCallableNoticeableFuture);
193    
194                                    NoticeableFuture<T> noticeableFuture =
195                                            new NoticeableFutureConverter
196                                                    <T, ProcessCallable<? extends Serializable>>(
197                                                            processCallableNoticeableFuture) {
198    
199                                                    @Override
200                                                    protected T convert(
201                                                                    ProcessCallable<? extends Serializable>
202                                                                            processCallable)
203                                                            throws ProcessException {
204    
205                                                            if (processCallable instanceof
206                                                                            ReturnProcessCallable<?>) {
207    
208                                                                    return (T)processCallable.call();
209                                                            }
210    
211                                                            ExceptionProcessCallable exceptionProcessCallable =
212                                                                    (ExceptionProcessCallable)processCallable;
213    
214                                                            throw exceptionProcessCallable.call();
215                                                    }
216    
217                                            };
218    
219                                    return new LocalProcessChannel<>(
220                                            noticeableFuture, objectOutputStream, asyncBroker);
221                            }
222                            catch (RejectedExecutionException ree) {
223                                    process.destroy();
224    
225                                    throw new ProcessException(
226                                            "Cancelled execution because of a concurrent destroy", ree);
227                            }
228                    }
229                    catch (IOException ioe) {
230                            throw new ProcessException(ioe);
231                    }
232            }
233    
234            private ThreadPoolExecutor _getThreadPoolExecutor() {
235                    if (_threadPoolExecutor != null) {
236                            return _threadPoolExecutor;
237                    }
238    
239                    synchronized (this) {
240                            if (_threadPoolExecutor == null) {
241                                    _threadPoolExecutor = new ThreadPoolExecutor(
242                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
243                                            Integer.MAX_VALUE, new AbortPolicy(),
244                                            new NamedThreadFactory(
245                                                    LocalProcessExecutor.class.getName(),
246                                                    Thread.MIN_PRIORITY,
247                                                    PortalClassLoaderUtil.getClassLoader()),
248                                            new ThreadPoolHandlerAdapter());
249                            }
250                    }
251    
252                    return _threadPoolExecutor;
253            }
254    
255            private static final Log _log = LogFactoryUtil.getLog(
256                    LocalProcessExecutor.class);
257    
258            private final Map<Process, NoticeableFuture<?>> _managedProcesses =
259                    new ConcurrentHashMap<>();
260            private volatile ThreadPoolExecutor _threadPoolExecutor;
261    
262            private class SubprocessReactor
263                    implements Callable<ProcessCallable<? extends Serializable>> {
264    
265                    public SubprocessReactor(
266                            Process process, ClassLoader reactClassLoader,
267                            AsyncBroker<Long, Serializable> asyncBroker) {
268    
269                            _process = process;
270                            _reactClassLoader = reactClassLoader;
271                            _asyncBroker = asyncBroker;
272                    }
273    
274                    @Override
275                    public ProcessCallable<? extends Serializable> call() throws Exception {
276                            ProcessCallable<?> resultProcessCallable = null;
277    
278                            AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
279    
280                            UnsyncBufferedInputStream unsyncBufferedInputStream =
281                                    new UnsyncBufferedInputStream(_process.getInputStream());
282    
283                            try {
284                                    ObjectInputStream objectInputStream = null;
285    
286                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
287                                            new UnsyncByteArrayOutputStream();
288    
289                                    while (true) {
290                                            try {
291    
292                                                    // Be ready for a bad header
293    
294                                                    unsyncBufferedInputStream.mark(4);
295    
296                                                    objectInputStream = new ClassLoaderObjectInputStream(
297                                                            unsyncBufferedInputStream, _reactClassLoader);
298    
299                                                    // Found the beginning of the object input stream. Flush
300                                                    // out corrupted log if necessary.
301    
302                                                    if (unsyncByteArrayOutputStream.size() > 0) {
303                                                            if (_log.isWarnEnabled()) {
304                                                                    _log.warn(
305                                                                            "Found corrupt leading log " +
306                                                                                    unsyncByteArrayOutputStream.toString());
307                                                            }
308                                                    }
309    
310                                                    unsyncByteArrayOutputStream = null;
311    
312                                                    break;
313                                            }
314                                            catch (StreamCorruptedException sce) {
315    
316                                                    // Collecting bad header as log information
317    
318                                                    unsyncBufferedInputStream.reset();
319    
320                                                    unsyncByteArrayOutputStream.write(
321                                                            unsyncBufferedInputStream.read());
322                                            }
323                                    }
324    
325                                    while (true) {
326                                            Object obj = null;
327    
328                                            try {
329                                                    obj = objectInputStream.readObject();
330                                            }
331                                            catch (WriteAbortedException wae) {
332                                                    if (_log.isWarnEnabled()) {
333                                                            _log.warn("Caught a write aborted exception", wae);
334                                                    }
335    
336                                                    continue;
337                                            }
338    
339                                            if (!(obj instanceof ProcessCallable)) {
340                                                    if (_log.isInfoEnabled()) {
341                                                            _log.info(
342                                                                    "Received a nonprocess callable piping back " +
343                                                                            obj);
344                                                    }
345    
346                                                    continue;
347                                            }
348    
349                                            ProcessCallable<?> processCallable =
350                                                    (ProcessCallable<?>)obj;
351    
352                                            if ((processCallable instanceof ExceptionProcessCallable) ||
353                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
354    
355                                                    resultProcessCallable = processCallable;
356    
357                                                    continue;
358                                            }
359    
360                                            try {
361                                                    Serializable returnValue = processCallable.call();
362    
363                                                    if (_log.isDebugEnabled()) {
364                                                            _log.debug(
365                                                                    "Invoked generic process callable " +
366                                                                            processCallable + " with return value " +
367                                                                                    returnValue);
368                                                    }
369                                            }
370                                            catch (Throwable t) {
371                                                    _log.error(
372                                                            "Unable to invoke generic process callable", t);
373                                            }
374                                    }
375                            }
376                            catch (StreamCorruptedException sce) {
377                                    File file = File.createTempFile(
378                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
379                                            ".log");
380    
381                                    _log.error(
382                                            "Dumping content of corrupted object input stream to " +
383                                                    file.getAbsolutePath(),
384                                            sce);
385    
386                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
387    
388                                    StreamUtil.transfer(
389                                            unsyncBufferedInputStream, fileOutputStream);
390    
391                                    throw new ProcessException(
392                                            "Corrupted object input stream", sce);
393                            }
394                            catch (EOFException eofe) {
395                                    throw new ProcessException(
396                                            "Subprocess piping back ended prematurely", eofe);
397                            }
398                            catch (Throwable t) {
399                                    _log.error("Abort subprocess piping", t);
400    
401                                    throw t;
402                            }
403                            finally {
404                                    try {
405                                            int exitCode = _process.waitFor();
406    
407                                            if (exitCode != 0) {
408                                                    throw new TerminationProcessException(exitCode);
409                                            }
410                                    }
411                                    catch (InterruptedException ie) {
412                                            _process.destroy();
413    
414                                            throw new ProcessException(
415                                                    "Forcibly killed subprocess on interruption", ie);
416                                    }
417    
418                                    _managedProcesses.remove(_process);
419    
420                                    if (resultProcessCallable != null) {
421    
422                                            // Override previous process exception if there was one
423    
424                                            return resultProcessCallable;
425                                    }
426    
427                                    AsyncBrokerThreadLocal.removeAsyncBroker();
428                            }
429                    }
430    
431                    private final AsyncBroker<Long, Serializable> _asyncBroker;
432                    private final Process _process;
433                    private final ClassLoader _reactClassLoader;
434    
435            }
436    
437    }