001
014
015 package com.liferay.portal.fabric.netty.handlers;
016
017 import com.liferay.portal.fabric.agent.FabricAgent;
018 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
019 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
020 import com.liferay.portal.fabric.netty.rpc.ChannelThreadLocal;
021 import com.liferay.portal.fabric.netty.rpc.RPCUtil;
022 import com.liferay.portal.fabric.netty.rpc.SyncProcessRPCCallable;
023 import com.liferay.portal.fabric.netty.util.NettyUtil;
024 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
025 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
026 import com.liferay.portal.fabric.repository.Repository;
027 import com.liferay.portal.fabric.worker.FabricWorker;
028 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
029 import com.liferay.portal.kernel.concurrent.FutureListener;
030 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.process.ClassPathUtil;
035 import com.liferay.portal.kernel.process.ProcessCallable;
036 import com.liferay.portal.kernel.process.ProcessConfig;
037 import com.liferay.portal.kernel.process.ProcessConfig.Builder;
038 import com.liferay.portal.kernel.process.ProcessException;
039 import com.liferay.portal.kernel.util.ArrayUtil;
040 import com.liferay.portal.kernel.util.StringBundler;
041 import com.liferay.portal.kernel.util.StringUtil;
042
043 import io.netty.channel.Channel;
044 import io.netty.channel.ChannelFuture;
045 import io.netty.channel.ChannelFutureListener;
046 import io.netty.channel.ChannelHandlerContext;
047 import io.netty.channel.SimpleChannelInboundHandler;
048 import io.netty.util.concurrent.EventExecutor;
049 import io.netty.util.concurrent.GenericFutureListener;
050
051 import java.io.File;
052 import java.io.IOException;
053 import java.io.Serializable;
054
055 import java.net.MalformedURLException;
056 import java.net.URLClassLoader;
057
058 import java.nio.file.Path;
059 import java.nio.file.Paths;
060
061 import java.util.ArrayList;
062 import java.util.HashMap;
063 import java.util.LinkedHashMap;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.concurrent.Callable;
067 import java.util.concurrent.ExecutionException;
068 import java.util.concurrent.Future;
069
070
073 public class NettyFabricWorkerExecutionChannelHandler
074 extends SimpleChannelInboundHandler<NettyFabricWorkerConfig<Serializable>> {
075
076 public NettyFabricWorkerExecutionChannelHandler(
077 Repository<Channel> repository, FabricAgent fabricAgent,
078 long executionTimeout) {
079
080 if (repository == null) {
081 throw new NullPointerException("Repository is null");
082 }
083
084 if (fabricAgent == null) {
085 throw new NullPointerException("Fabric agent is null");
086 }
087
088 _repository = repository;
089 _fabricAgent = fabricAgent;
090 _executionTimeout = executionTimeout;
091 }
092
093 @Override
094 public void exceptionCaught(
095 ChannelHandlerContext channelHandlerContext, Throwable throwable) {
096
097 final Channel channel = channelHandlerContext.channel();
098
099 _log.error("Closing " + channel + " due to:", throwable);
100
101 ChannelFuture channelFuture = channel.close();
102
103 channelFuture.addListener(
104 new ChannelFutureListener() {
105
106 @Override
107 public void operationComplete(ChannelFuture channelFuture) {
108 if (_log.isInfoEnabled()) {
109 _log.info(channel + " is closed");
110 }
111 }
112
113 });
114 }
115
116 @Override
117 protected void channelRead0(
118 ChannelHandlerContext channelHandlerContext,
119 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
120
121 NoticeableFuture<LoadedPaths> noticeableFuture = loadPaths(
122 channelHandlerContext.channel(), nettyFabricWorkerConfig);
123
124 noticeableFuture.addFutureListener(
125 new PostLoadPathsFutureListener(
126 channelHandlerContext, nettyFabricWorkerConfig));
127 }
128
129 protected NoticeableFuture<LoadedPaths> loadPaths(
130 Channel channel,
131 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
132
133 Map<Path, Path> mergedPaths = new HashMap<Path, Path>();
134
135 ProcessConfig processConfig =
136 nettyFabricWorkerConfig.getProcessConfig();
137
138 final Map<Path, Path> bootstrapPaths = new LinkedHashMap<Path, Path>();
139
140 for (String pathString :
141 processConfig.getBootstrapClassPathElements()) {
142
143 bootstrapPaths.put(Paths.get(pathString), null);
144 }
145
146 mergedPaths.putAll(bootstrapPaths);
147
148 final Map<Path, Path> runtimePaths = new LinkedHashMap<Path, Path>();
149
150 for (String pathString : processConfig.getRuntimeClassPathElements()) {
151 runtimePaths.put(Paths.get(pathString), null);
152 }
153
154 mergedPaths.putAll(runtimePaths);
155
156 final Map<Path, Path> inputPaths =
157 nettyFabricWorkerConfig.getInputPathMap();
158
159 mergedPaths.putAll(inputPaths);
160
161 return new NoticeableFutureConverter<LoadedPaths, Map<Path, Path>>(
162 _repository.getFiles(channel, mergedPaths, false)) {
163
164 @Override
165 protected LoadedPaths convert(Map<Path, Path> mergedPaths)
166 throws IOException {
167
168 Map<Path, Path> loadedInputPaths = new HashMap<Path, Path>();
169
170 List<Path> missedInputPaths = new ArrayList<Path>();
171
172 for (Path path : inputPaths.keySet()) {
173 Path loadedInputPath = mergedPaths.get(path);
174
175 if (loadedInputPath == null) {
176 missedInputPaths.add(path);
177 }
178 else {
179 loadedInputPaths.put(path, loadedInputPath);
180 }
181 }
182
183 if (!missedInputPaths.isEmpty()) {
184 throw new IOException(
185 "Unable to get input paths: " + missedInputPaths);
186 }
187
188 List<Path> loadedBootstrapPaths = new ArrayList<Path>();
189
190 List<Path> missedBootstrapPaths = new ArrayList<Path>();
191
192 for (Path path : bootstrapPaths.keySet()) {
193 Path loadedBootstrapPath = mergedPaths.get(path);
194
195 if (loadedBootstrapPath == null) {
196 missedBootstrapPaths.add(path);
197 }
198 else {
199 loadedBootstrapPaths.add(loadedBootstrapPath);
200 }
201 }
202
203 if (!missedBootstrapPaths.isEmpty() && _log.isWarnEnabled()) {
204 _log.warn(
205 "Incomplete bootstrap classpath loaded, missed: " +
206 missedBootstrapPaths);
207 }
208
209 List<Path> loadedRuntimePaths = new ArrayList<Path>();
210
211 List<Path> missedRuntimePaths = new ArrayList<Path>();
212
213 for (Path path : runtimePaths.keySet()) {
214 Path loadedRuntimePath = mergedPaths.get(path);
215
216 if (loadedRuntimePath == null) {
217 missedRuntimePaths.add(path);
218 }
219 else {
220 loadedRuntimePaths.add(loadedRuntimePath);
221 }
222 }
223
224 if (!missedRuntimePaths.isEmpty() && _log.isWarnEnabled()) {
225 _log.warn(
226 "Incomplete runtime classpath loaded, missed: " +
227 missedRuntimePaths);
228 }
229
230 return new LoadedPaths(
231 loadedInputPaths,
232 StringUtil.merge(loadedBootstrapPaths, File.pathSeparator),
233 StringUtil.merge(loadedRuntimePaths, File.pathSeparator));
234 }
235 };
236 }
237
238 protected void sendResult(
239 Channel channel, long fabricWorkerId, Serializable result,
240 Throwable t) {
241
242 final FabricWorkerResultProcessCallable
243 fabricWorkerResultProcessCallable =
244 new FabricWorkerResultProcessCallable(
245 fabricWorkerId, result, t);
246
247 NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
248 channel,
249 new SyncProcessRPCCallable<Serializable>(
250 fabricWorkerResultProcessCallable));
251
252 NettyUtil.scheduleCancellation(
253 channel, noticeableFuture, _executionTimeout);
254
255 noticeableFuture.addFutureListener(
256 new BaseFutureListener<Serializable>() {
257
258 @Override
259 public void completeWithException(
260 Future<Serializable> future, Throwable throwable) {
261
262 _log.error(
263 "Unable to send back fabric worker result " +
264 fabricWorkerResultProcessCallable,
265 throwable);
266 }
267
268 });
269 }
270
271 protected static class FabricAgentFinishStartupProcessCallable
272 implements ProcessCallable<Serializable> {
273
274 @Override
275 public Serializable call() throws ProcessException {
276 Channel channel = ChannelThreadLocal.getChannel();
277
278 NettyFabricAgentStub nettyStubFabricAgent =
279 NettyChannelAttributes.getNettyFabricAgentStub(channel);
280
281 if (nettyStubFabricAgent == null) {
282 throw new ProcessException(
283 "Unable to locate fabric agent on channel " + channel);
284 }
285
286 nettyStubFabricAgent.finsihStartup(_id);
287
288 return null;
289 }
290
291 protected FabricAgentFinishStartupProcessCallable(long id) {
292 _id = id;
293 }
294
295 private static final long serialVersionUID = 1L;
296
297 private final long _id;
298
299 }
300
301 protected static class FabricWorkerResultProcessCallable
302 implements ProcessCallable<Serializable> {
303
304 @Override
305 public Serializable call() throws ProcessException {
306 Channel channel = ChannelThreadLocal.getChannel();
307
308 NettyFabricAgentStub nettyStubFabricAgent =
309 NettyChannelAttributes.getNettyFabricAgentStub(channel);
310
311 if (nettyStubFabricAgent == null) {
312 throw new ProcessException(
313 "Unable to locate fabric agent on channel " + channel);
314 }
315
316 NettyFabricWorkerStub<Serializable> nettyStubFabricWorker =
317 (NettyFabricWorkerStub<Serializable>)
318 nettyStubFabricAgent.takeNettyStubFabricWorker(_id);
319
320 if (nettyStubFabricWorker == null) {
321 throw new ProcessException(
322 "Unable to locate fabric worker on channel " + channel +
323 ", with fabric worker id " + _id);
324 }
325
326 if (_throwable != null) {
327 nettyStubFabricWorker.setException(_throwable);
328 }
329 else {
330 nettyStubFabricWorker.setResult(_result);
331 }
332
333 return null;
334 }
335
336 @Override
337 public String toString() {
338 StringBundler sb = new StringBundler(7);
339
340 sb.append("{id=");
341 sb.append(_id);
342 sb.append(", result=");
343 sb.append(_result);
344 sb.append(", throwable=");
345 sb.append(_throwable);
346 sb.append("}");
347
348 return sb.toString();
349 }
350
351 protected FabricWorkerResultProcessCallable(
352 long id, Serializable result, Throwable throwable) {
353
354 _id = id;
355 _result = result;
356 _throwable = throwable;
357 }
358
359 private static final long serialVersionUID = 1L;
360
361 private final long _id;
362 private final Serializable _result;
363 private final Throwable _throwable;
364
365 }
366
367 protected static class LoadedPaths {
368
369 public LoadedPaths(
370 Map<Path, Path> inputPaths, String bootstrapClassPath,
371 String runtimeClassPath) {
372
373 _inputPaths = inputPaths;
374 _bootstrapClassPath = bootstrapClassPath;
375 _runtimeClassPath = runtimeClassPath;
376 }
377
378 public Map<Path, Path> getInputPaths() {
379 return _inputPaths;
380 }
381
382 public ProcessConfig toProcessConfig(ProcessConfig processConfig)
383 throws ProcessException {
384
385 Builder builder = new Builder();
386
387 builder.setArguments(processConfig.getArguments());
388 builder.setBootstrapClassPath(_bootstrapClassPath);
389 builder.setJavaExecutable(processConfig.getJavaExecutable());
390 builder.setRuntimeClassPath(_runtimeClassPath);
391
392 try {
393 builder.setReactClassLoader(
394 new URLClassLoader(
395 ArrayUtil.append(
396 ClassPathUtil.getClassPathURLs(_bootstrapClassPath),
397 ClassPathUtil.getClassPathURLs(
398 _runtimeClassPath))));
399 }
400 catch (MalformedURLException murle) {
401 throw new ProcessException(murle);
402 }
403
404 return builder.build();
405 }
406
407 private final String _bootstrapClassPath;
408 private final Map<Path, Path> _inputPaths;
409 private final String _runtimeClassPath;
410
411 }
412
413 protected class PostFabricWorkerExecutionFutureListener
414 implements GenericFutureListener
415 <io.netty.util.concurrent.Future<FabricWorker<Serializable>>> {
416
417 public PostFabricWorkerExecutionFutureListener(
418 Channel channel, LoadedPaths loadedPaths,
419 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
420
421 _channel = channel;
422 _loadedPaths = loadedPaths;
423 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
424 }
425
426 @Override
427 public void operationComplete(
428 io.netty.util.concurrent.Future<FabricWorker<Serializable>>
429 future)
430 throws Exception {
431
432 Throwable throwable = future.cause();
433
434 if (throwable != null) {
435 sendResult(
436 _channel, _nettyFabricWorkerConfig.getId(), null,
437 throwable);
438
439 return;
440 }
441
442 FabricWorker<Serializable> fabricWorker = future.get();
443
444 NettyChannelAttributes.putFabricWorker(
445 _channel, _nettyFabricWorkerConfig.getId(), fabricWorker);
446
447 NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
448 _channel,
449 new SyncProcessRPCCallable<Serializable>(
450 new FabricAgentFinishStartupProcessCallable(
451 _nettyFabricWorkerConfig.getId())));
452
453 NettyUtil.scheduleCancellation(
454 _channel, noticeableFuture, _executionTimeout);
455
456 noticeableFuture.addFutureListener(
457 new BaseFutureListener<Serializable>() {
458
459 @Override
460 public void completeWithException(
461 Future<Serializable> future, Throwable throwable) {
462
463 _log.error(
464 "Unable to finish fabric worker startup",
465 throwable);
466 }
467
468 });
469
470 NoticeableFuture<Serializable> processNoticeableFuture =
471 fabricWorker.getProcessNoticeableFuture();
472
473 processNoticeableFuture.addFutureListener(
474 new PostFabricWorkerFinishFutureListener(
475 _channel, _nettyFabricWorkerConfig, _loadedPaths));
476 }
477
478 private final Channel _channel;
479 private final LoadedPaths _loadedPaths;
480 private final NettyFabricWorkerConfig<Serializable>
481 _nettyFabricWorkerConfig;
482
483 }
484
485 protected class PostFabricWorkerFinishFutureListener
486 implements FutureListener<Serializable> {
487
488 public PostFabricWorkerFinishFutureListener(
489 Channel channel,
490 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig,
491 LoadedPaths loadedPaths) {
492
493 _channel = channel;
494 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
495 _loadedPaths = loadedPaths;
496 }
497
498 @Override
499 public void complete(Future<Serializable> future) {
500 Map<Path, Path> inputPaths = _loadedPaths.getInputPaths();
501
502 for (Path path : inputPaths.values()) {
503 FileHelperUtil.delete(true, path);
504 }
505
506 try {
507 sendResult(
508 _channel, _nettyFabricWorkerConfig.getId(), future.get(),
509 null);
510 }
511 catch (Throwable t) {
512 if (t instanceof ExecutionException) {
513 t = t.getCause();
514 }
515
516 sendResult(_channel, _nettyFabricWorkerConfig.getId(), null, t);
517 }
518 }
519
520 private final Channel _channel;
521 private final LoadedPaths _loadedPaths;
522 private final NettyFabricWorkerConfig<Serializable>
523 _nettyFabricWorkerConfig;
524
525 }
526
527 protected class PostLoadPathsFutureListener
528 extends BaseFutureListener<LoadedPaths> {
529
530 public PostLoadPathsFutureListener(
531 ChannelHandlerContext channelHandlerContext,
532 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
533
534 _channelHandlerContext = channelHandlerContext;
535 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
536 }
537
538 @Override
539 public void completeWithException(
540 Future<LoadedPaths> future, Throwable throwable) {
541
542 sendResult(
543 _channelHandlerContext.channel(),
544 _nettyFabricWorkerConfig.getId(), null, throwable);
545 }
546
547 @Override
548 public void completeWithResult(
549 Future<LoadedPaths> loadPathsFuture,
550 final LoadedPaths loadedPaths) {
551
552 EventExecutor eventExecutor = _channelHandlerContext.executor();
553
554 io.netty.util.concurrent.Future<FabricWorker<Serializable>> future =
555 eventExecutor.submit(
556 new Callable<FabricWorker<Serializable>>() {
557
558 @Override
559 public FabricWorker<Serializable> call()
560 throws ProcessException {
561
562 ProcessConfig processConfig =
563 _nettyFabricWorkerConfig.getProcessConfig();
564
565 return _fabricAgent.execute(
566 loadedPaths.toProcessConfig(processConfig),
567 _nettyFabricWorkerConfig.getProcessCallable());
568 }
569
570 });
571
572 future.addListener(
573 new PostFabricWorkerExecutionFutureListener(
574 _channelHandlerContext.channel(), loadedPaths,
575 _nettyFabricWorkerConfig));
576 }
577
578 private final ChannelHandlerContext _channelHandlerContext;
579 private final NettyFabricWorkerConfig<Serializable>
580 _nettyFabricWorkerConfig;
581
582 }
583
584 private static final Log _log = LogFactoryUtil.getLog(
585 NettyFabricWorkerExecutionChannelHandler.class);
586
587 private final long _executionTimeout;
588 private final FabricAgent _fabricAgent;
589 private final Repository<Channel> _repository;
590
591 }