001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
018 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
022 import com.liferay.portal.kernel.util.NamedThreadFactory;
023 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
024
025 import java.io.EOFException;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.ObjectInputStream;
029 import java.io.ObjectOutputStream;
030 import java.io.OutputStream;
031 import java.io.PrintStream;
032 import java.io.Serializable;
033 import java.io.StreamCorruptedException;
034
035 import java.util.concurrent.Callable;
036 import java.util.concurrent.ExecutorService;
037 import java.util.concurrent.Executors;
038 import java.util.concurrent.Future;
039
040
043 public class ProcessExecutor {
044
045 public static <T extends Serializable> T execute(
046 ProcessCallable<T> processCallable, String classPath)
047 throws ProcessException {
048
049 try {
050 ProcessBuilder processBuilder = new ProcessBuilder(
051 "java", "-cp", classPath, ProcessExecutor.class.getName());
052
053 Process process = processBuilder.start();
054
055 _writeObject(process.getOutputStream(), processCallable);
056
057 ExecutorService executorService = _getExecutorService();
058
059 SubprocessReactor subprocessReactor = new SubprocessReactor(
060 process.getInputStream());
061
062 Future<ProcessCallable<?>> futureResponseProcessCallable =
063 executorService.submit(subprocessReactor);
064
065 int exitCode = process.waitFor();
066
067 if (exitCode != 0) {
068 throw new ProcessException(
069 "Subprocess terminated with exit code " + exitCode);
070 }
071
072 ProcessCallable<?> responseProcessCallable =
073 futureResponseProcessCallable.get();
074
075 if (responseProcessCallable instanceof ReturnProcessCallable<?>) {
076 return (T)responseProcessCallable.call();
077 }
078
079 if (responseProcessCallable instanceof ExceptionProcessCallable) {
080 ExceptionProcessCallable exceptionProcessCallable =
081 (ExceptionProcessCallable)responseProcessCallable;
082
083 throw exceptionProcessCallable.call();
084 }
085
086 if (_log.isWarnEnabled()) {
087 _log.warn(
088 "Subprocess reactor exited without a valid return " +
089 "because the subprocess terminated with an exception");
090 }
091
092 return null;
093 }
094 catch (ProcessException pe) {
095 throw pe;
096 }
097 catch (Exception e) {
098 throw new ProcessException(e);
099 }
100 }
101
102 public static void main(String[] arguments)
103 throws ClassNotFoundException, IOException {
104
105 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
106 System.out);
107
108 ProcessOutputStream outProcessOutputStream = new ProcessOutputStream(
109 objectOutputStream, false);
110
111 PrintStream outPrintStream = new PrintStream(
112 outProcessOutputStream, true);
113
114 System.setOut(outPrintStream);
115
116 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
117 objectOutputStream, true);
118
119 PrintStream errPrintStream = new PrintStream(
120 errProcessOutputStream, true);
121
122 System.setErr(errPrintStream);
123
124 try {
125 ProcessCallable<?> processCallable =
126 (ProcessCallable<?>)_readObject(System.in, false);
127
128 Serializable result = processCallable.call();
129
130 outPrintStream.flush();
131
132 outProcessOutputStream.writeProcessCallable(
133 new ReturnProcessCallable<Serializable>(result));
134
135 outProcessOutputStream.close();
136 }
137 catch (ProcessException pe) {
138 errPrintStream.flush();
139
140 errProcessOutputStream.writeProcessCallable(
141 new ExceptionProcessCallable(pe));
142
143 errProcessOutputStream.close();
144 }
145 }
146
147 public void destroy() {
148 if (_executorService == null) {
149 return;
150 }
151
152 synchronized (ProcessExecutor.class) {
153 if (_executorService != null) {
154 _executorService.shutdownNow();
155
156 _executorService = null;
157 }
158 }
159 }
160
161 private static ExecutorService _getExecutorService() {
162 if (_executorService != null) {
163 return _executorService;
164 }
165
166 synchronized (ProcessExecutor.class) {
167 if (_executorService == null) {
168 _executorService = Executors.newCachedThreadPool(
169 new NamedThreadFactory(
170 ProcessExecutor.class.getName(),
171 Thread.MIN_PRIORITY,
172 PortalClassLoaderUtil.getClassLoader()));
173 }
174 }
175
176 return _executorService;
177 }
178
179 private static Object _readObject(InputStream inputStream, boolean close)
180 throws ClassNotFoundException, IOException {
181
182 ObjectInputStream objectInputStream = new ObjectInputStream(
183 inputStream);
184
185 try {
186 return objectInputStream.readObject();
187 }
188 finally {
189 if (close) {
190 objectInputStream.close();
191 }
192 }
193 }
194
195 private static void _writeObject(OutputStream outputStream, Object object)
196 throws IOException {
197
198 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
199 outputStream);
200
201 try {
202 objectOutputStream.writeObject(object);
203 }
204 finally {
205 objectOutputStream.close();
206 }
207 }
208
209 private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
210
211 private static volatile ExecutorService _executorService;
212
213 private static class SubprocessReactor
214 implements Callable<ProcessCallable<? extends Serializable>> {
215
216 public SubprocessReactor(InputStream inputStream) {
217 _unsyncBufferedInputStream = new UnsyncBufferedInputStream(
218 inputStream);
219 }
220
221 public ProcessCallable<? extends Serializable> call() throws Exception {
222 try {
223 ObjectInputStream objectInputStream = null;
224
225 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
226 new UnsyncByteArrayOutputStream();
227
228 while (true) {
229 try {
230
231
232
233 _unsyncBufferedInputStream.mark(4);
234
235 objectInputStream =
236 new PortalClassLoaderObjectInputStream(
237 _unsyncBufferedInputStream);
238
239
240
241
242 if (unsyncByteArrayOutputStream.size() > 0) {
243 if (_log.isWarnEnabled()) {
244 _log.warn(
245 "Found corrupted leading log: " +
246 unsyncByteArrayOutputStream.toString());
247 }
248 }
249
250 unsyncByteArrayOutputStream = null;
251
252 break;
253 }
254 catch (StreamCorruptedException sce) {
255
256
257
258 _unsyncBufferedInputStream.reset();
259
260 unsyncByteArrayOutputStream.write(
261 _unsyncBufferedInputStream.read());
262 }
263 }
264
265 while (true) {
266 ProcessCallable<?> processCallable =
267 (ProcessCallable<?>)objectInputStream.readObject();
268
269 if (processCallable instanceof ExceptionProcessCallable) {
270 return processCallable;
271 }
272
273 if (processCallable instanceof ReturnProcessCallable<?>) {
274 return processCallable;
275 }
276
277 Serializable result = processCallable.call();
278
279 if (_log.isDebugEnabled()) {
280 _log.debug(
281 "Invoked generic process callable " +
282 processCallable + " with return value " +
283 result);
284 }
285 }
286 }
287 catch (EOFException eofe) {
288 }
289
290 return null;
291 }
292
293 private final UnsyncBufferedInputStream _unsyncBufferedInputStream;
294
295 }
296
297 }