001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
018    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019    import com.liferay.portal.kernel.log.Log;
020    import com.liferay.portal.kernel.log.LogFactoryUtil;
021    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
022    import com.liferay.portal.kernel.util.NamedThreadFactory;
023    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
024    
025    import java.io.EOFException;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.ObjectInputStream;
029    import java.io.ObjectOutputStream;
030    import java.io.OutputStream;
031    import java.io.PrintStream;
032    import java.io.Serializable;
033    import java.io.StreamCorruptedException;
034    
035    import java.util.concurrent.Callable;
036    import java.util.concurrent.ExecutorService;
037    import java.util.concurrent.Executors;
038    import java.util.concurrent.Future;
039    
040    /**
041     * @author Shuyang Zhou
042     */
043    public class ProcessExecutor {
044    
045            public static <T extends Serializable> T execute(
046                            ProcessCallable<T> processCallable, String classPath)
047                    throws ProcessException {
048    
049                    try {
050                            ProcessBuilder processBuilder = new ProcessBuilder(
051                                    "java", "-cp", classPath, ProcessExecutor.class.getName());
052    
053                            Process process = processBuilder.start();
054    
055                            _writeObject(process.getOutputStream(), processCallable);
056    
057                            ExecutorService executorService = _getExecutorService();
058    
059                            SubprocessReactor subprocessReactor = new SubprocessReactor(
060                                    process.getInputStream());
061    
062                            Future<ProcessCallable<?>> futureResponseProcessCallable =
063                                    executorService.submit(subprocessReactor);
064    
065                            int exitCode = process.waitFor();
066    
067                            if (exitCode != 0) {
068                                    throw new ProcessException(
069                                            "Subprocess terminated with exit code " + exitCode);
070                            }
071    
072                            ProcessCallable<?> responseProcessCallable =
073                                    futureResponseProcessCallable.get();
074    
075                            if (responseProcessCallable instanceof ReturnProcessCallable<?>) {
076                                    return (T)responseProcessCallable.call();
077                            }
078    
079                            if (responseProcessCallable instanceof ExceptionProcessCallable) {
080                                    ExceptionProcessCallable exceptionProcessCallable =
081                                            (ExceptionProcessCallable)responseProcessCallable;
082    
083                                    throw exceptionProcessCallable.call();
084                            }
085    
086                            if (_log.isWarnEnabled()) {
087                                    _log.warn(
088                                            "Subprocess reactor exited without a valid return " +
089                                                    "because the subprocess terminated with an exception");
090                            }
091    
092                            return null;
093                    }
094                    catch (ProcessException pe) {
095                            throw pe;
096                    }
097                    catch (Exception e) {
098                            throw new ProcessException(e);
099                    }
100            }
101    
102            public static void main(String[] arguments)
103                    throws ClassNotFoundException, IOException {
104    
105                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
106                            System.out);
107    
108                    ProcessOutputStream outProcessOutputStream = new ProcessOutputStream(
109                            objectOutputStream, false);
110    
111                    PrintStream outPrintStream = new PrintStream(
112                            outProcessOutputStream, true);
113    
114                    System.setOut(outPrintStream);
115    
116                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
117                            objectOutputStream, true);
118    
119                    PrintStream errPrintStream = new PrintStream(
120                            errProcessOutputStream, true);
121    
122                    System.setErr(errPrintStream);
123    
124                    try {
125                            ProcessCallable<?> processCallable =
126                                    (ProcessCallable<?>)_readObject(System.in, false);
127    
128                            Serializable result = processCallable.call();
129    
130                            outPrintStream.flush();
131    
132                            outProcessOutputStream.writeProcessCallable(
133                                    new ReturnProcessCallable<Serializable>(result));
134    
135                            outProcessOutputStream.close();
136                    }
137                    catch (ProcessException pe) {
138                            errPrintStream.flush();
139    
140                            errProcessOutputStream.writeProcessCallable(
141                                    new ExceptionProcessCallable(pe));
142    
143                            errProcessOutputStream.close();
144                    }
145            }
146    
147            public void destroy() {
148                    if (_executorService == null) {
149                            return;
150                    }
151    
152                    synchronized (ProcessExecutor.class) {
153                            if (_executorService != null) {
154                                    _executorService.shutdownNow();
155    
156                                    _executorService = null;
157                            }
158                    }
159            }
160    
161            private static ExecutorService _getExecutorService() {
162                    if (_executorService != null) {
163                            return _executorService;
164                    }
165    
166                    synchronized (ProcessExecutor.class) {
167                            if (_executorService == null) {
168                                    _executorService = Executors.newCachedThreadPool(
169                                            new NamedThreadFactory(
170                                                    ProcessExecutor.class.getName(),
171                                                    Thread.MIN_PRIORITY,
172                                                    PortalClassLoaderUtil.getClassLoader()));
173                            }
174                    }
175    
176                    return _executorService;
177            }
178    
179            private static Object _readObject(InputStream inputStream, boolean close)
180                    throws ClassNotFoundException, IOException {
181    
182                    ObjectInputStream objectInputStream = new ObjectInputStream(
183                            inputStream);
184    
185                    try {
186                            return objectInputStream.readObject();
187                    }
188                    finally {
189                            if (close) {
190                                    objectInputStream.close();
191                            }
192                    }
193            }
194    
195            private static void _writeObject(OutputStream outputStream, Object object)
196                    throws IOException {
197    
198                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
199                            outputStream);
200    
201                    try {
202                            objectOutputStream.writeObject(object);
203                    }
204                    finally {
205                            objectOutputStream.close();
206                    }
207            }
208    
209            private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
210    
211            private static volatile ExecutorService _executorService;
212    
213            private static class SubprocessReactor
214                    implements Callable<ProcessCallable<? extends Serializable>> {
215    
216                    public SubprocessReactor(InputStream inputStream) {
217                            _unsyncBufferedInputStream = new UnsyncBufferedInputStream(
218                                    inputStream);
219                    }
220    
221                    public ProcessCallable<? extends Serializable> call() throws Exception {
222                            try {
223                                    ObjectInputStream objectInputStream = null;
224    
225                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
226                                            new UnsyncByteArrayOutputStream();
227    
228                                    while (true) {
229                                            try {
230    
231                                                    // Be ready for a bad header
232    
233                                                    _unsyncBufferedInputStream.mark(4);
234    
235                                                    objectInputStream =
236                                                            new PortalClassLoaderObjectInputStream(
237                                                                    _unsyncBufferedInputStream);
238    
239                                                    // Found the beginning of the object input stream. Flush
240                                                    // out corrupted log if necessary.
241    
242                                                    if (unsyncByteArrayOutputStream.size() > 0) {
243                                                            if (_log.isWarnEnabled()) {
244                                                                    _log.warn(
245                                                                            "Found corrupted leading log: " +
246                                                                                    unsyncByteArrayOutputStream.toString());
247                                                            }
248                                                    }
249    
250                                                    unsyncByteArrayOutputStream = null;
251    
252                                                    break;
253                                            }
254                                            catch (StreamCorruptedException sce) {
255    
256                                                    // Collecting bad header as log information
257    
258                                                    _unsyncBufferedInputStream.reset();
259    
260                                                    unsyncByteArrayOutputStream.write(
261                                                            _unsyncBufferedInputStream.read());
262                                            }
263                                    }
264    
265                                    while (true) {
266                                            ProcessCallable<?> processCallable =
267                                                    (ProcessCallable<?>)objectInputStream.readObject();
268    
269                                            if (processCallable instanceof ExceptionProcessCallable) {
270                                                    return processCallable;
271                                            }
272    
273                                            if (processCallable instanceof ReturnProcessCallable<?>) {
274                                                    return processCallable;
275                                            }
276    
277                                            Serializable result = processCallable.call();
278    
279                                            if (_log.isDebugEnabled()) {
280                                                    _log.debug(
281                                                            "Invoked generic process callable " +
282                                                                    processCallable + " with return value " +
283                                                                            result);
284                                            }
285                                    }
286                            }
287                            catch (EOFException eofe) {
288                            }
289    
290                            return null;
291                    }
292    
293                    private final UnsyncBufferedInputStream _unsyncBufferedInputStream;
294    
295            }
296    
297    }