001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
018 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
020 import com.liferay.portal.kernel.process.ClassPathUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.process.ProcessException;
023 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025 import com.liferay.portal.kernel.util.StringPool;
026
027 import java.io.FileDescriptor;
028 import java.io.FileOutputStream;
029 import java.io.IOException;
030 import java.io.ObjectInputStream;
031 import java.io.ObjectOutputStream;
032 import java.io.PrintStream;
033 import java.io.Serializable;
034
035 import java.net.URLClassLoader;
036
037 import java.util.concurrent.ConcurrentHashMap;
038 import java.util.concurrent.ConcurrentMap;
039 import java.util.concurrent.atomic.AtomicReference;
040
041
044 public class LocalProcessLauncher {
045
046 public static void main(String[] arguments) throws IOException {
047 PrintStream oldOutPrintStream = System.out;
048
049 ObjectOutputStream objectOutputStream = null;
050 ProcessOutputStream outProcessOutputStream = null;
051
052 synchronized (oldOutPrintStream) {
053 oldOutPrintStream.flush();
054
055 FileOutputStream fileOutputStream = new FileOutputStream(
056 FileDescriptor.out);
057
058 objectOutputStream = new ObjectOutputStream(
059 new UnsyncBufferedOutputStream(fileOutputStream));
060
061 outProcessOutputStream = new ProcessOutputStream(
062 objectOutputStream, false);
063
064 ProcessContext._setProcessOutputStream(outProcessOutputStream);
065
066 PrintStream newOutPrintStream = new PrintStream(
067 outProcessOutputStream, true);
068
069 System.setOut(newOutPrintStream);
070 }
071
072 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
073 objectOutputStream, true);
074
075 PrintStream errPrintStream = new PrintStream(
076 errProcessOutputStream, true);
077
078 System.setErr(errPrintStream);
079
080 Thread currentThread = Thread.currentThread();
081
082 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
083
084 try {
085 ObjectInputStream bootstrapObjectInputStream =
086 new ObjectInputStream(System.in);
087
088 String processCallableName =
089 (String)bootstrapObjectInputStream.readObject();
090
091 String logPrefixString =
092 StringPool.OPEN_BRACKET.concat(processCallableName).concat(
093 StringPool.CLOSE_BRACKET);
094
095 byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
096
097 outProcessOutputStream.setLogPrefix(logPrefix);
098 errProcessOutputStream.setLogPrefix(logPrefix);
099
100 String classPath = (String)bootstrapObjectInputStream.readObject();
101
102 ClassLoader classLoader = new URLClassLoader(
103 ClassPathUtil.getClassPathURLs(classPath));
104
105 currentThread.setContextClassLoader(classLoader);
106
107 ObjectInputStream objectInputStream =
108 new ClassLoaderObjectInputStream(
109 bootstrapObjectInputStream, classLoader);
110
111 ProcessCallable<?> processCallable =
112 (ProcessCallable<?>)objectInputStream.readObject();
113
114 Thread thread = new Thread(
115 new ProcessCallableRunner(objectInputStream),
116 "ProcessCallable-Runner");
117
118 thread.setDaemon(true);
119
120 thread.start();
121
122 Serializable result = processCallable.call();
123
124 System.out.flush();
125
126 outProcessOutputStream.writeProcessCallable(
127 new ReturnProcessCallable<Serializable>(result));
128
129 outProcessOutputStream.flush();
130 }
131 catch (Throwable t) {
132 errPrintStream.flush();
133
134 ProcessException processException = null;
135
136 if (t instanceof ProcessException) {
137 processException = (ProcessException)t;
138 }
139 else {
140 processException = new ProcessException(t);
141 }
142
143 errProcessOutputStream.writeProcessCallable(
144 new ExceptionProcessCallable(processException));
145
146 errProcessOutputStream.flush();
147 }
148 finally {
149 currentThread.setContextClassLoader(contextClassLoader);
150 }
151 }
152
153 public static class ProcessContext {
154
155 public static boolean attach(
156 String message, long interval, ShutdownHook shutdownHook) {
157
158 HeartbeatThread heartbeatThread = new HeartbeatThread(
159 message, interval, shutdownHook);
160
161 boolean value = _heartbeatThreadReference.compareAndSet(
162 null, heartbeatThread);
163
164 if (value) {
165 heartbeatThread.start();
166 }
167
168 return value;
169 }
170
171 public static void detach() throws InterruptedException {
172 HeartbeatThread heartbeatThread =
173 _heartbeatThreadReference.getAndSet(null);
174
175 if (heartbeatThread != null) {
176 heartbeatThread.detach();
177 heartbeatThread.join();
178 }
179 }
180
181 public static ConcurrentMap<String, Object> getAttributes() {
182 return _attributes;
183 }
184
185 public static ProcessOutputStream getProcessOutputStream() {
186 return _processOutputStream;
187 }
188
189 public static boolean isAttached() {
190 HeartbeatThread attachThread = _heartbeatThreadReference.get();
191
192 if (attachThread != null) {
193 return true;
194 }
195 else {
196 return false;
197 }
198 }
199
200 private static void _setProcessOutputStream(
201 ProcessOutputStream processOutputStream) {
202
203 _processOutputStream = processOutputStream;
204 }
205
206 private ProcessContext() {
207 }
208
209 private static final ConcurrentMap<String, Object> _attributes =
210 new ConcurrentHashMap<>();
211 private static final AtomicReference<HeartbeatThread>
212 _heartbeatThreadReference = new AtomicReference<>();
213 private static ProcessOutputStream _processOutputStream;
214
215 }
216
217 public interface ShutdownHook {
218
219 public static final int BROKEN_PIPE_CODE = 1;
220
221 public static final int INTERRUPTION_CODE = 2;
222
223 public static final int UNKNOWN_CODE = 3;
224
225 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
226
227 }
228
229 private static class HeartbeatThread extends Thread {
230
231 public HeartbeatThread(
232 String message, long interval, ShutdownHook shutdownHook) {
233
234 if (shutdownHook == null) {
235 throw new IllegalArgumentException("Shutdown hook is null");
236 }
237
238 _interval = interval;
239 _shutdownHook = shutdownHook;
240
241 _pringBackProcessCallable = new PingbackProcessCallable(message);
242
243 setDaemon(true);
244 setName(HeartbeatThread.class.getSimpleName());
245 }
246
247 public void detach() {
248 _detach = true;
249
250 interrupt();
251 }
252
253 @Override
254 public void run() {
255 ProcessOutputStream processOutputStream =
256 ProcessContext.getProcessOutputStream();
257
258 int shutdownCode = 0;
259 Throwable shutdownThrowable = null;
260
261 while (!_detach) {
262 try {
263 sleep(_interval);
264
265 processOutputStream.writeProcessCallable(
266 _pringBackProcessCallable);
267 }
268 catch (InterruptedException ie) {
269 if (_detach) {
270 return;
271 }
272 else {
273 shutdownThrowable = ie;
274
275 shutdownCode = ShutdownHook.INTERRUPTION_CODE;
276 }
277 }
278 catch (IOException ioe) {
279 shutdownThrowable = ioe;
280
281 shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
282 }
283 catch (Throwable throwable) {
284 shutdownThrowable = throwable;
285
286 shutdownCode = ShutdownHook.UNKNOWN_CODE;
287 }
288
289 if (shutdownCode != 0) {
290 _detach = _shutdownHook.shutdown(
291 shutdownCode, shutdownThrowable);
292 }
293 }
294 }
295
296 private volatile boolean _detach;
297 private final long _interval;
298 private final ProcessCallable<String> _pringBackProcessCallable;
299 private final ShutdownHook _shutdownHook;
300
301 }
302
303 private static class PingbackProcessCallable
304 implements ProcessCallable<String> {
305
306 public PingbackProcessCallable(String message) {
307 _message = message;
308 }
309
310 @Override
311 public String call() {
312 return _message;
313 }
314
315 private static final long serialVersionUID = 1L;
316
317 private final String _message;
318
319 }
320
321 private static class ProcessCallableRunner implements Runnable {
322
323 public ProcessCallableRunner(ObjectInputStream objectInputStream) {
324 _objectInputStream = objectInputStream;
325 }
326
327 @Override
328 public void run() {
329 while (true) {
330 try {
331 ProcessCallable<?> processCallable =
332 (ProcessCallable<?>)_objectInputStream.readObject();
333
334 processCallable.call();
335 }
336 catch (Exception e) {
337 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
338 new UnsyncByteArrayOutputStream();
339
340 UnsyncPrintWriter unsyncPrintWriter = new UnsyncPrintWriter(
341 unsyncByteArrayOutputStream);
342
343 unsyncPrintWriter.println(e);
344
345 e.printStackTrace(unsyncPrintWriter);
346
347 unsyncPrintWriter.println();
348
349 unsyncPrintWriter.flush();
350
351 System.err.write(
352 unsyncByteArrayOutputStream.unsafeGetByteArray(), 0,
353 unsyncByteArrayOutputStream.size());
354 System.err.flush();
355 }
356 }
357 }
358
359 private final ObjectInputStream _objectInputStream;
360
361 }
362
363 }