001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
018 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
020 import com.liferay.portal.kernel.process.ClassPathUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.process.ProcessException;
023 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025 import com.liferay.portal.kernel.util.StringPool;
026
027 import java.io.FileDescriptor;
028 import java.io.FileOutputStream;
029 import java.io.IOException;
030 import java.io.ObjectInputStream;
031 import java.io.ObjectOutputStream;
032 import java.io.PrintStream;
033 import java.io.Serializable;
034
035 import java.net.URLClassLoader;
036
037 import java.util.concurrent.ConcurrentHashMap;
038 import java.util.concurrent.ConcurrentMap;
039 import java.util.concurrent.atomic.AtomicReference;
040
041
044 public class LocalProcessLauncher {
045
046 public static void main(String[] arguments)
047 throws ClassNotFoundException, IOException {
048
049 PrintStream oldOutPrintStream = System.out;
050
051 ObjectOutputStream objectOutputStream = null;
052 ProcessOutputStream outProcessOutputStream = null;
053
054 synchronized (oldOutPrintStream) {
055 oldOutPrintStream.flush();
056
057 FileOutputStream fileOutputStream = new FileOutputStream(
058 FileDescriptor.out);
059
060 objectOutputStream = new ObjectOutputStream(
061 new UnsyncBufferedOutputStream(fileOutputStream));
062
063 outProcessOutputStream = new ProcessOutputStream(
064 objectOutputStream, false);
065
066 ProcessContext._setProcessOutputStream(outProcessOutputStream);
067
068 PrintStream newOutPrintStream = new PrintStream(
069 outProcessOutputStream, true);
070
071 System.setOut(newOutPrintStream);
072 }
073
074 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
075 objectOutputStream, true);
076
077 PrintStream errPrintStream = new PrintStream(
078 errProcessOutputStream, true);
079
080 System.setErr(errPrintStream);
081
082 Thread currentThread = Thread.currentThread();
083
084 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
085
086 try {
087 ObjectInputStream bootstrapObjectInputStream =
088 new ObjectInputStream(System.in);
089
090 String processCallableName =
091 (String)bootstrapObjectInputStream.readObject();
092
093 String logPrefixString =
094 StringPool.OPEN_BRACKET.concat(processCallableName).concat(
095 StringPool.CLOSE_BRACKET);
096
097 byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
098
099 outProcessOutputStream.setLogPrefix(logPrefix);
100 errProcessOutputStream.setLogPrefix(logPrefix);
101
102 String classPath = (String)bootstrapObjectInputStream.readObject();
103
104 ClassLoader classLoader = new URLClassLoader(
105 ClassPathUtil.getClassPathURLs(classPath));
106
107 currentThread.setContextClassLoader(classLoader);
108
109 ObjectInputStream objectInputStream =
110 new ClassLoaderObjectInputStream(
111 bootstrapObjectInputStream, classLoader);
112
113 ProcessCallable<?> processCallable =
114 (ProcessCallable<?>)objectInputStream.readObject();
115
116 Thread thread = new Thread(
117 new ProcessCallableRunner(objectInputStream),
118 "ProcessCallable-Runner");
119
120 thread.setDaemon(true);
121
122 thread.start();
123
124 Serializable result = processCallable.call();
125
126 System.out.flush();
127
128 outProcessOutputStream.writeProcessCallable(
129 new ReturnProcessCallable<Serializable>(result));
130
131 outProcessOutputStream.flush();
132 }
133 catch (ProcessException pe) {
134 errPrintStream.flush();
135
136 errProcessOutputStream.writeProcessCallable(
137 new ExceptionProcessCallable(pe));
138
139 errProcessOutputStream.flush();
140 }
141 finally {
142 currentThread.setContextClassLoader(contextClassLoader);
143 }
144 }
145
146 public static class ProcessContext {
147
148 public static boolean attach(
149 String message, long interval, ShutdownHook shutdownHook) {
150
151 HeartbeatThread heartbeatThread = new HeartbeatThread(
152 message, interval, shutdownHook);
153
154 boolean value = _heartbeatThreadReference.compareAndSet(
155 null, heartbeatThread);
156
157 if (value) {
158 heartbeatThread.start();
159 }
160
161 return value;
162 }
163
164 public static void detach() throws InterruptedException {
165 HeartbeatThread heartbeatThread =
166 _heartbeatThreadReference.getAndSet(null);
167
168 if (heartbeatThread != null) {
169 heartbeatThread.detach();
170 heartbeatThread.join();
171 }
172 }
173
174 public static ConcurrentMap<String, Object> getAttributes() {
175 return _attributes;
176 }
177
178 public static ProcessOutputStream getProcessOutputStream() {
179 return _processOutputStream;
180 }
181
182 public static boolean isAttached() {
183 HeartbeatThread attachThread = _heartbeatThreadReference.get();
184
185 if (attachThread != null) {
186 return true;
187 }
188 else {
189 return false;
190 }
191 }
192
193 private static void _setProcessOutputStream(
194 ProcessOutputStream processOutputStream) {
195
196 _processOutputStream = processOutputStream;
197 }
198
199 private ProcessContext() {
200 }
201
202 private static final ConcurrentMap<String, Object> _attributes =
203 new ConcurrentHashMap<String, Object>();
204 private static final AtomicReference<HeartbeatThread>
205 _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
206 private static ProcessOutputStream _processOutputStream;
207
208 }
209
210 public interface ShutdownHook {
211
212 public static final int BROKEN_PIPE_CODE = 1;
213
214 public static final int INTERRUPTION_CODE = 2;
215
216 public static final int UNKNOWN_CODE = 3;
217
218 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
219
220 }
221
222 private static class HeartbeatThread extends Thread {
223
224 public HeartbeatThread(
225 String message, long interval, ShutdownHook shutdownHook) {
226
227 if (shutdownHook == null) {
228 throw new IllegalArgumentException("Shutdown hook is null");
229 }
230
231 _interval = interval;
232 _shutdownHook = shutdownHook;
233
234 _pringBackProcessCallable = new PingbackProcessCallable(message);
235
236 setDaemon(true);
237 setName(HeartbeatThread.class.getSimpleName());
238 }
239
240 public void detach() {
241 _detach = true;
242
243 interrupt();
244 }
245
246 @Override
247 public void run() {
248 ProcessOutputStream processOutputStream =
249 ProcessContext.getProcessOutputStream();
250
251 int shutdownCode = 0;
252 Throwable shutdownThrowable = null;
253
254 while (!_detach) {
255 try {
256 sleep(_interval);
257
258 processOutputStream.writeProcessCallable(
259 _pringBackProcessCallable);
260 }
261 catch (InterruptedException ie) {
262 if (_detach) {
263 return;
264 }
265 else {
266 shutdownThrowable = ie;
267
268 shutdownCode = ShutdownHook.INTERRUPTION_CODE;
269 }
270 }
271 catch (IOException ioe) {
272 shutdownThrowable = ioe;
273
274 shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
275 }
276 catch (Throwable throwable) {
277 shutdownThrowable = throwable;
278
279 shutdownCode = ShutdownHook.UNKNOWN_CODE;
280 }
281
282 if (shutdownCode != 0) {
283 _detach = _shutdownHook.shutdown(
284 shutdownCode, shutdownThrowable);
285 }
286 }
287 }
288
289 private volatile boolean _detach;
290 private final long _interval;
291 private final ProcessCallable<String> _pringBackProcessCallable;
292 private final ShutdownHook _shutdownHook;
293
294 }
295
296 private static class PingbackProcessCallable
297 implements ProcessCallable<String> {
298
299 public PingbackProcessCallable(String message) {
300 _message = message;
301 }
302
303 @Override
304 public String call() {
305 return _message;
306 }
307
308 private static final long serialVersionUID = 1L;
309
310 private final String _message;
311
312 }
313
314 private static class ProcessCallableRunner implements Runnable {
315
316 public ProcessCallableRunner(ObjectInputStream objectInputStream) {
317 _objectInputStream = objectInputStream;
318 }
319
320 @Override
321 public void run() {
322 while (true) {
323 try {
324 ProcessCallable<?> processCallable =
325 (ProcessCallable<?>)_objectInputStream.readObject();
326
327 processCallable.call();
328 }
329 catch (Exception e) {
330 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
331 new UnsyncByteArrayOutputStream();
332
333 UnsyncPrintWriter unsyncPrintWriter = new UnsyncPrintWriter(
334 unsyncByteArrayOutputStream);
335
336 unsyncPrintWriter.println(e);
337
338 e.printStackTrace(unsyncPrintWriter);
339
340 unsyncPrintWriter.println();
341
342 unsyncPrintWriter.flush();
343
344 System.err.write(
345 unsyncByteArrayOutputStream.unsafeGetByteArray(), 0,
346 unsyncByteArrayOutputStream.size());
347 System.err.flush();
348 }
349 }
350 }
351
352 private final ObjectInputStream _objectInputStream;
353
354 }
355
356 }