001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process.local;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.AsyncBroker;
019    import com.liferay.portal.kernel.concurrent.FutureListener;
020    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.process.ProcessCallable;
029    import com.liferay.portal.kernel.process.ProcessChannel;
030    import com.liferay.portal.kernel.process.ProcessConfig;
031    import com.liferay.portal.kernel.process.ProcessException;
032    import com.liferay.portal.kernel.process.ProcessExecutor;
033    import com.liferay.portal.kernel.process.TerminationProcessException;
034    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035    import com.liferay.portal.kernel.util.NamedThreadFactory;
036    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037    import com.liferay.portal.kernel.util.StreamUtil;
038    
039    import java.io.EOFException;
040    import java.io.File;
041    import java.io.FileOutputStream;
042    import java.io.IOException;
043    import java.io.ObjectInputStream;
044    import java.io.ObjectOutputStream;
045    import java.io.Serializable;
046    import java.io.StreamCorruptedException;
047    import java.io.WriteAbortedException;
048    
049    import java.util.ArrayList;
050    import java.util.Collections;
051    import java.util.HashSet;
052    import java.util.Iterator;
053    import java.util.List;
054    import java.util.Map;
055    import java.util.Map.Entry;
056    import java.util.Set;
057    import java.util.concurrent.Callable;
058    import java.util.concurrent.ConcurrentHashMap;
059    import java.util.concurrent.Future;
060    import java.util.concurrent.RejectedExecutionException;
061    import java.util.concurrent.TimeUnit;
062    
063    /**
064     * @author Shuyang Zhou
065     */
066    public class LocalProcessExecutor implements ProcessExecutor {
067    
068            public Set<Process> destroy() {
069                    if (_threadPoolExecutor == null) {
070                            return Collections.emptySet();
071                    }
072    
073                    Set<Process> processes = Collections.emptySet();
074    
075                    synchronized (this) {
076                            if (_threadPoolExecutor != null) {
077                                    processes = new HashSet<>();
078    
079                                    _threadPoolExecutor.shutdownNow();
080    
081                                    // At this point, the thread pool will no longer take in any
082                                    // more subprocess reactors, so we know the list of managed
083                                    // processes is in a safe state. The worst case is that the
084                                    // destroyer thread and the thread pool thread concurrently
085                                    // destroy the same process, but this is JDK's job to ensure
086                                    // that processes are destroyed in a thread safe manner.
087    
088                                    Set<Entry<Process, NoticeableFuture<?>>> set =
089                                            _managedProcesses.entrySet();
090    
091                                    Iterator<Entry<Process, NoticeableFuture<?>>> iterator =
092                                            set.iterator();
093    
094                                    while (iterator.hasNext()) {
095                                            Entry<Process, NoticeableFuture<?>> entry = iterator.next();
096    
097                                            processes.add(entry.getKey());
098    
099                                            NoticeableFuture<?> noticeableFuture = entry.getValue();
100    
101                                            noticeableFuture.cancel(true);
102    
103                                            iterator.remove();
104                                    }
105    
106                                    // The current thread has a more comprehensive view of the list
107                                    // of managed processes than any thread pool thread. After the
108                                    // previous iteration, we are safe to clear the list of managed
109                                    // processes.
110    
111                                    _managedProcesses.clear();
112    
113                                    _threadPoolExecutor = null;
114                            }
115                    }
116    
117                    // Whip's instrument logic sees a label on a synchronized block exit and
118                    // asks for coverage, but it does not understand that this is actually
119                    // the same as exiting a method. To overcome this limitation, the code
120                    // logic has to explicitly leave the synchronized block before leaving
121                    // the method. This limitation will be removed in a future version of
122                    // Whip.
123    
124                    return processes;
125            }
126    
127            @Override
128            public <T extends Serializable> ProcessChannel<T> execute(
129                            ProcessConfig processConfig, ProcessCallable<T> processCallable)
130                    throws ProcessException {
131    
132                    try {
133                            List<String> arguments = processConfig.getArguments();
134    
135                            List<String> commands = new ArrayList<>(arguments.size() + 4);
136    
137                            commands.add(processConfig.getJavaExecutable());
138                            commands.add("-cp");
139                            commands.add(processConfig.getBootstrapClassPath());
140                            commands.addAll(arguments);
141                            commands.add(LocalProcessLauncher.class.getName());
142    
143                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
144    
145                            final Process process = processBuilder.start();
146    
147                            ObjectOutputStream bootstrapObjectOutputStream =
148                                    new ObjectOutputStream(process.getOutputStream());
149    
150                            bootstrapObjectOutputStream.writeObject(processCallable.toString());
151                            bootstrapObjectOutputStream.writeObject(
152                                    processConfig.getRuntimeClassPath());
153    
154                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
155                                    bootstrapObjectOutputStream);
156    
157                            objectOutputStream.writeObject(processCallable);
158    
159                            objectOutputStream.flush();
160    
161                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
162    
163                            AsyncBroker<Long, Serializable> asyncBroker = new AsyncBroker<>();
164    
165                            SubprocessReactor subprocessReactor = new SubprocessReactor(
166                                    process, processConfig.getReactClassLoader(), asyncBroker);
167    
168                            try {
169                                    NoticeableFuture<ProcessCallable<? extends Serializable>>
170                                            processCallableNoticeableFuture = threadPoolExecutor.submit(
171                                                    subprocessReactor);
172    
173                                    processCallableNoticeableFuture.addFutureListener(
174                                            new FutureListener
175                                                    <ProcessCallable<? extends Serializable>>() {
176    
177                                                    @Override
178                                                    public void complete(
179                                                            Future<ProcessCallable<? extends Serializable>>
180                                                                    future) {
181    
182                                                            if (future.isCancelled()) {
183                                                                    process.destroy();
184                                                            }
185                                                    }
186    
187                                            });
188    
189                                    // Consider the newly created process as a managed process only
190                                    // after the subprocess reactor is taken by the thread pool
191    
192                                    _managedProcesses.put(process, processCallableNoticeableFuture);
193    
194                                    NoticeableFuture<T> noticeableFuture =
195                                            new NoticeableFutureConverter
196                                                    <T, ProcessCallable<? extends Serializable>>(
197                                                            processCallableNoticeableFuture) {
198    
199                                                            @Override
200                                                            protected T convert(
201                                                                            ProcessCallable<? extends Serializable>
202                                                                                    processCallable)
203                                                                    throws ProcessException {
204    
205                                                                    if (processCallable instanceof
206                                                                                    ReturnProcessCallable<?>) {
207    
208                                                                            return (T)processCallable.call();
209                                                                    }
210    
211                                                                    ExceptionProcessCallable
212                                                                            exceptionProcessCallable =
213                                                                                    (ExceptionProcessCallable)
214                                                                                            processCallable;
215    
216                                                                    throw exceptionProcessCallable.call();
217                                                            }
218    
219                                                    };
220    
221                                    return new LocalProcessChannel<>(
222                                            noticeableFuture, objectOutputStream, asyncBroker);
223                            }
224                            catch (RejectedExecutionException ree) {
225                                    process.destroy();
226    
227                                    throw new ProcessException(
228                                            "Cancelled execution because of a concurrent destroy", ree);
229                            }
230                    }
231                    catch (IOException ioe) {
232                            throw new ProcessException(ioe);
233                    }
234            }
235    
236            private ThreadPoolExecutor _getThreadPoolExecutor() {
237                    if (_threadPoolExecutor != null) {
238                            return _threadPoolExecutor;
239                    }
240    
241                    synchronized (this) {
242                            if (_threadPoolExecutor == null) {
243                                    _threadPoolExecutor = new ThreadPoolExecutor(
244                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
245                                            Integer.MAX_VALUE, new AbortPolicy(),
246                                            new NamedThreadFactory(
247                                                    LocalProcessExecutor.class.getName(),
248                                                    Thread.MIN_PRIORITY,
249                                                    PortalClassLoaderUtil.getClassLoader()),
250                                            new ThreadPoolHandlerAdapter());
251                            }
252                    }
253    
254                    return _threadPoolExecutor;
255            }
256    
257            private static final Log _log = LogFactoryUtil.getLog(
258                    LocalProcessExecutor.class);
259    
260            private final Map<Process, NoticeableFuture<?>> _managedProcesses =
261                    new ConcurrentHashMap<>();
262            private volatile ThreadPoolExecutor _threadPoolExecutor;
263    
264            private class SubprocessReactor
265                    implements Callable<ProcessCallable<? extends Serializable>> {
266    
267                    public SubprocessReactor(
268                            Process process, ClassLoader reactClassLoader,
269                            AsyncBroker<Long, Serializable> asyncBroker) {
270    
271                            _process = process;
272                            _reactClassLoader = reactClassLoader;
273                            _asyncBroker = asyncBroker;
274                    }
275    
276                    @Override
277                    public ProcessCallable<? extends Serializable> call() throws Exception {
278                            ProcessCallable<?> resultProcessCallable = null;
279    
280                            AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
281    
282                            UnsyncBufferedInputStream unsyncBufferedInputStream =
283                                    new UnsyncBufferedInputStream(_process.getInputStream());
284    
285                            try {
286                                    ObjectInputStream objectInputStream = null;
287    
288                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
289                                            new UnsyncByteArrayOutputStream();
290    
291                                    while (true) {
292                                            try {
293    
294                                                    // Be ready for a bad header
295    
296                                                    unsyncBufferedInputStream.mark(4);
297    
298                                                    objectInputStream = new ClassLoaderObjectInputStream(
299                                                            unsyncBufferedInputStream, _reactClassLoader);
300    
301                                                    // Found the beginning of the object input stream. Flush
302                                                    // out corrupted log if necessary.
303    
304                                                    if (unsyncByteArrayOutputStream.size() > 0) {
305                                                            if (_log.isWarnEnabled()) {
306                                                                    _log.warn(
307                                                                            "Found corrupt leading log " +
308                                                                                    unsyncByteArrayOutputStream.toString());
309                                                            }
310                                                    }
311    
312                                                    unsyncByteArrayOutputStream = null;
313    
314                                                    break;
315                                            }
316                                            catch (StreamCorruptedException sce) {
317    
318                                                    // Collecting bad header as log information
319    
320                                                    unsyncBufferedInputStream.reset();
321    
322                                                    unsyncByteArrayOutputStream.write(
323                                                            unsyncBufferedInputStream.read());
324                                            }
325                                    }
326    
327                                    while (true) {
328                                            Object obj = null;
329    
330                                            try {
331                                                    obj = objectInputStream.readObject();
332                                            }
333                                            catch (WriteAbortedException wae) {
334                                                    if (_log.isWarnEnabled()) {
335                                                            _log.warn("Caught a write aborted exception", wae);
336                                                    }
337    
338                                                    continue;
339                                            }
340    
341                                            if (!(obj instanceof ProcessCallable)) {
342                                                    if (_log.isInfoEnabled()) {
343                                                            _log.info(
344                                                                    "Received a nonprocess callable piping back " +
345                                                                            obj);
346                                                    }
347    
348                                                    continue;
349                                            }
350    
351                                            ProcessCallable<?> processCallable =
352                                                    (ProcessCallable<?>)obj;
353    
354                                            if ((processCallable instanceof ExceptionProcessCallable) ||
355                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
356    
357                                                    resultProcessCallable = processCallable;
358    
359                                                    continue;
360                                            }
361    
362                                            try {
363                                                    Serializable returnValue = processCallable.call();
364    
365                                                    if (_log.isDebugEnabled()) {
366                                                            _log.debug(
367                                                                    "Invoked generic process callable " +
368                                                                            processCallable + " with return value " +
369                                                                                    returnValue);
370                                                    }
371                                            }
372                                            catch (Throwable t) {
373                                                    _log.error(
374                                                            "Unable to invoke generic process callable", t);
375                                            }
376                                    }
377                            }
378                            catch (StreamCorruptedException sce) {
379                                    File file = File.createTempFile(
380                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
381                                            ".log");
382    
383                                    _log.error(
384                                            "Dumping content of corrupted object input stream to " +
385                                                    file.getAbsolutePath(),
386                                            sce);
387    
388                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
389    
390                                    StreamUtil.transfer(
391                                            unsyncBufferedInputStream, fileOutputStream);
392    
393                                    throw new ProcessException(
394                                            "Corrupted object input stream", sce);
395                            }
396                            catch (EOFException eofe) {
397                                    throw new ProcessException(
398                                            "Subprocess piping back ended prematurely", eofe);
399                            }
400                            catch (Throwable t) {
401                                    _log.error("Abort subprocess piping", t);
402    
403                                    throw t;
404                            }
405                            finally {
406                                    try {
407                                            int exitCode = _process.waitFor();
408    
409                                            if (exitCode != 0) {
410                                                    throw new TerminationProcessException(exitCode);
411                                            }
412                                    }
413                                    catch (InterruptedException ie) {
414                                            _process.destroy();
415    
416                                            throw new ProcessException(
417                                                    "Forcibly killed subprocess on interruption", ie);
418                                    }
419    
420                                    _managedProcesses.remove(_process);
421    
422                                    if (resultProcessCallable != null) {
423    
424                                            // Override previous process exception if there was one
425    
426                                            return resultProcessCallable;
427                                    }
428    
429                                    AsyncBrokerThreadLocal.removeAsyncBroker();
430                            }
431                    }
432    
433                    private final AsyncBroker<Long, Serializable> _asyncBroker;
434                    private final Process _process;
435                    private final ClassLoader _reactClassLoader;
436    
437            }
438    
439    }