001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process.local;
016    
017    import com.liferay.portal.kernel.concurrent.AbortPolicy;
018    import com.liferay.portal.kernel.concurrent.AsyncBroker;
019    import com.liferay.portal.kernel.concurrent.FutureListener;
020    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
021    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
022    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
023    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
024    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
025    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.process.ProcessCallable;
029    import com.liferay.portal.kernel.process.ProcessChannel;
030    import com.liferay.portal.kernel.process.ProcessConfig;
031    import com.liferay.portal.kernel.process.ProcessException;
032    import com.liferay.portal.kernel.process.ProcessExecutor;
033    import com.liferay.portal.kernel.process.TerminationProcessException;
034    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
035    import com.liferay.portal.kernel.util.NamedThreadFactory;
036    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
037    import com.liferay.portal.kernel.util.StreamUtil;
038    
039    import java.io.EOFException;
040    import java.io.File;
041    import java.io.FileOutputStream;
042    import java.io.IOException;
043    import java.io.ObjectInputStream;
044    import java.io.ObjectOutputStream;
045    import java.io.Serializable;
046    import java.io.StreamCorruptedException;
047    
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Iterator;
051    import java.util.List;
052    import java.util.Map;
053    import java.util.concurrent.Callable;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.Future;
056    import java.util.concurrent.RejectedExecutionException;
057    import java.util.concurrent.TimeUnit;
058    
059    /**
060     * @author Shuyang Zhou
061     */
062    public class LocalProcessExecutor implements ProcessExecutor {
063    
064            public void destroy() {
065                    if (_threadPoolExecutor == null) {
066                            return;
067                    }
068    
069                    synchronized (this) {
070                            if (_threadPoolExecutor != null) {
071                                    _threadPoolExecutor.shutdownNow();
072    
073                                    // At this point, the thread pool will no longer take in any
074                                    // more subprocess reactors, so we know the list of managed
075                                    // processes is in a safe state. The worst case is that the
076                                    // destroyer thread and the thread pool thread concurrently
077                                    // destroy the same process, but this is JDK's job to ensure
078                                    // that processes are destroyed in a thread safe manner.
079    
080                                    Collection<NoticeableFuture<?>> values =
081                                            _managedProcesses.values();
082    
083                                    Iterator<NoticeableFuture<?>> iterator = values.iterator();
084    
085                                    while (iterator.hasNext()) {
086                                            NoticeableFuture<?> noticeableFuture = iterator.next();
087    
088                                            noticeableFuture.cancel(true);
089    
090                                            iterator.remove();
091                                    }
092    
093                                    // The current thread has a more comprehensive view of the list
094                                    // of managed processes than any thread pool thread. After the
095                                    // previous iteration, we are safe to clear the list of managed
096                                    // processes.
097    
098                                    _managedProcesses.clear();
099    
100                                    _threadPoolExecutor = null;
101                            }
102                    }
103            }
104    
105            @Override
106            public <T extends Serializable> ProcessChannel<T> execute(
107                            ProcessConfig processConfig, ProcessCallable<T> processCallable)
108                    throws ProcessException {
109    
110                    try {
111                            List<String> arguments = processConfig.getArguments();
112    
113                            List<String> commands = new ArrayList<String>(arguments.size() + 4);
114    
115                            commands.add(processConfig.getJavaExecutable());
116                            commands.add("-cp");
117                            commands.add(processConfig.getBootstrapClassPath());
118                            commands.addAll(arguments);
119                            commands.add(LocalProcessLauncher.class.getName());
120    
121                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
122    
123                            final Process process = processBuilder.start();
124    
125                            ObjectOutputStream bootstrapObjectOutputStream =
126                                    new ObjectOutputStream(process.getOutputStream());
127    
128                            bootstrapObjectOutputStream.writeObject(processCallable.toString());
129                            bootstrapObjectOutputStream.writeObject(
130                                    processConfig.getRuntimeClassPath());
131    
132                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
133                                    bootstrapObjectOutputStream);
134    
135                            objectOutputStream.writeObject(processCallable);
136    
137                            objectOutputStream.flush();
138    
139                            ThreadPoolExecutor threadPoolExecutor = _getThreadPoolExecutor();
140    
141                            AsyncBroker<Long, Serializable> asyncBroker =
142                                    new AsyncBroker<Long, Serializable>();
143    
144                            SubprocessReactor subprocessReactor = new SubprocessReactor(
145                                    process, processConfig.getReactClassLoader(), asyncBroker);
146    
147                            try {
148                                    NoticeableFuture<ProcessCallable<? extends Serializable>>
149                                            processCallableNoticeableFuture = threadPoolExecutor.submit(
150                                                    subprocessReactor);
151    
152                                    processCallableNoticeableFuture.addFutureListener(
153                                            new FutureListener
154                                                    <ProcessCallable<? extends Serializable>>() {
155    
156                                                    @Override
157                                                    public void complete(
158                                                            Future<ProcessCallable<? extends Serializable>>
159                                                                    future) {
160    
161                                                            if (future.isCancelled()) {
162                                                                    process.destroy();
163                                                            }
164                                                    }
165    
166                                            });
167    
168                                    // Consider the newly created process as a managed process only
169                                    // after the subprocess reactor is taken by the thread pool
170    
171                                    _managedProcesses.put(process, processCallableNoticeableFuture);
172    
173                                    NoticeableFuture<T> noticeableFuture =
174                                            new NoticeableFutureConverter
175                                                    <T, ProcessCallable<? extends Serializable>>(
176                                                            processCallableNoticeableFuture) {
177    
178                                                            @Override
179                                                            protected T convert(
180                                                                            ProcessCallable<? extends Serializable>
181                                                                                    processCallable)
182                                                                    throws ProcessException {
183    
184                                                                    if (processCallable instanceof
185                                                                                    ReturnProcessCallable<?>) {
186    
187                                                                            return (T)processCallable.call();
188                                                                    }
189    
190                                                                    ExceptionProcessCallable
191                                                                            exceptionProcessCallable =
192                                                                                    (ExceptionProcessCallable)
193                                                                                            processCallable;
194    
195                                                                    throw exceptionProcessCallable.call();
196                                                            }
197    
198                                                    };
199    
200                                    return new LocalProcessChannel<T>(
201                                            noticeableFuture, objectOutputStream, asyncBroker);
202                            }
203                            catch (RejectedExecutionException ree) {
204                                    process.destroy();
205    
206                                    throw new ProcessException(
207                                            "Cancelled execution because of a concurrent destroy", ree);
208                            }
209                    }
210                    catch (IOException ioe) {
211                            throw new ProcessException(ioe);
212                    }
213            }
214    
215            private ThreadPoolExecutor _getThreadPoolExecutor() {
216                    if (_threadPoolExecutor != null) {
217                            return _threadPoolExecutor;
218                    }
219    
220                    synchronized (this) {
221                            if (_threadPoolExecutor == null) {
222                                    _threadPoolExecutor = new ThreadPoolExecutor(
223                                            0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, true,
224                                            Integer.MAX_VALUE, new AbortPolicy(),
225                                            new NamedThreadFactory(
226                                                    LocalProcessExecutor.class.getName(),
227                                                    Thread.MIN_PRIORITY,
228                                                    PortalClassLoaderUtil.getClassLoader()),
229                                            new ThreadPoolHandlerAdapter());
230                            }
231                    }
232    
233                    return _threadPoolExecutor;
234            }
235    
236            private static final Log _log = LogFactoryUtil.getLog(
237                    LocalProcessExecutor.class);
238    
239            private final Map<Process, NoticeableFuture<?>> _managedProcesses =
240                    new ConcurrentHashMap<Process, NoticeableFuture<?>>();
241            private volatile ThreadPoolExecutor _threadPoolExecutor;
242    
243            private class SubprocessReactor
244                    implements Callable<ProcessCallable<? extends Serializable>> {
245    
246                    public SubprocessReactor(
247                            Process process, ClassLoader reactClassLoader,
248                            AsyncBroker<Long, Serializable> asyncBroker) {
249    
250                            _process = process;
251                            _reactClassLoader = reactClassLoader;
252                            _asyncBroker = asyncBroker;
253                    }
254    
255                    @Override
256                    public ProcessCallable<? extends Serializable> call() throws Exception {
257                            ProcessCallable<?> resultProcessCallable = null;
258    
259                            AsyncBrokerThreadLocal.setAsyncBroker(_asyncBroker);
260    
261                            UnsyncBufferedInputStream unsyncBufferedInputStream =
262                                    new UnsyncBufferedInputStream(_process.getInputStream());
263    
264                            try {
265                                    ObjectInputStream objectInputStream = null;
266    
267                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
268                                            new UnsyncByteArrayOutputStream();
269    
270                                    while (true) {
271                                            try {
272    
273                                                    // Be ready for a bad header
274    
275                                                    unsyncBufferedInputStream.mark(4);
276    
277                                                    objectInputStream = new ClassLoaderObjectInputStream(
278                                                            unsyncBufferedInputStream, _reactClassLoader);
279    
280                                                    // Found the beginning of the object input stream. Flush
281                                                    // out corrupted log if necessary.
282    
283                                                    if (unsyncByteArrayOutputStream.size() > 0) {
284                                                            if (_log.isWarnEnabled()) {
285                                                                    _log.warn(
286                                                                            "Found corrupt leading log " +
287                                                                                    unsyncByteArrayOutputStream.toString());
288                                                            }
289                                                    }
290    
291                                                    unsyncByteArrayOutputStream = null;
292    
293                                                    break;
294                                            }
295                                            catch (StreamCorruptedException sce) {
296    
297                                                    // Collecting bad header as log information
298    
299                                                    unsyncBufferedInputStream.reset();
300    
301                                                    unsyncByteArrayOutputStream.write(
302                                                            unsyncBufferedInputStream.read());
303                                            }
304                                    }
305    
306                                    while (true) {
307                                            ProcessCallable<?> processCallable =
308                                                    (ProcessCallable<?>)objectInputStream.readObject();
309    
310                                            if ((processCallable instanceof ExceptionProcessCallable) ||
311                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
312    
313                                                    resultProcessCallable = processCallable;
314    
315                                                    continue;
316                                            }
317    
318                                            try {
319                                                    Serializable returnValue = processCallable.call();
320    
321                                                    if (_log.isDebugEnabled()) {
322                                                            _log.debug(
323                                                                    "Invoked generic process callable " +
324                                                                            processCallable + " with return value " +
325                                                                                    returnValue);
326                                                    }
327                                            }
328                                            catch (Throwable t) {
329                                                    _log.error(
330                                                            "Unable to invoke generic process callable", t);
331                                            }
332                                    }
333                            }
334                            catch (StreamCorruptedException sce) {
335                                    File file = File.createTempFile(
336                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
337                                            ".log");
338    
339                                    _log.error(
340                                            "Dumping content of corrupted object input stream to " +
341                                                    file.getAbsolutePath(),
342                                            sce);
343    
344                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
345    
346                                    StreamUtil.transfer(
347                                            unsyncBufferedInputStream, fileOutputStream);
348    
349                                    throw new ProcessException(
350                                            "Corrupted object input stream", sce);
351                            }
352                            catch (EOFException eofe) {
353                                    throw new ProcessException(
354                                            "Subprocess piping back ended prematurely", eofe);
355                            }
356                            catch (Throwable t) {
357                                    _log.error("Abort subprocess piping", t);
358    
359                                    throw t;
360                            }
361                            finally {
362                                    try {
363                                            int exitCode = _process.waitFor();
364    
365                                            if (exitCode != 0) {
366                                                    throw new TerminationProcessException(exitCode);
367                                            }
368                                    }
369                                    catch (InterruptedException ie) {
370                                            _process.destroy();
371    
372                                            throw new ProcessException(
373                                                    "Forcibly killed subprocess on interruption", ie);
374                                    }
375    
376                                    _managedProcesses.remove(_process);
377    
378                                    if (resultProcessCallable != null) {
379    
380                                            // Override previous process exception if there was one
381    
382                                            return resultProcessCallable;
383                                    }
384    
385                                    AsyncBrokerThreadLocal.removeAsyncBroker();
386                            }
387                    }
388    
389                    private final AsyncBroker<Long, Serializable> _asyncBroker;
390                    private final Process _process;
391                    private final ClassLoader _reactClassLoader;
392    
393            }
394    
395    }