001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.handlers;
016    
017    import com.liferay.portal.fabric.agent.FabricAgent;
018    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
019    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
020    import com.liferay.portal.fabric.netty.rpc.ChannelThreadLocal;
021    import com.liferay.portal.fabric.netty.rpc.RPCUtil;
022    import com.liferay.portal.fabric.netty.rpc.SyncProcessRPCCallable;
023    import com.liferay.portal.fabric.netty.util.NettyUtil;
024    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
025    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
026    import com.liferay.portal.fabric.repository.Repository;
027    import com.liferay.portal.fabric.worker.FabricWorker;
028    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
029    import com.liferay.portal.kernel.concurrent.FutureListener;
030    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.process.ClassPathUtil;
035    import com.liferay.portal.kernel.process.ProcessCallable;
036    import com.liferay.portal.kernel.process.ProcessConfig;
037    import com.liferay.portal.kernel.process.ProcessConfig.Builder;
038    import com.liferay.portal.kernel.process.ProcessException;
039    import com.liferay.portal.kernel.util.ArrayUtil;
040    import com.liferay.portal.kernel.util.StringBundler;
041    import com.liferay.portal.kernel.util.StringUtil;
042    
043    import io.netty.channel.Channel;
044    import io.netty.channel.ChannelFuture;
045    import io.netty.channel.ChannelFutureListener;
046    import io.netty.channel.ChannelHandlerContext;
047    import io.netty.channel.SimpleChannelInboundHandler;
048    import io.netty.util.concurrent.EventExecutor;
049    import io.netty.util.concurrent.GenericFutureListener;
050    
051    import java.io.File;
052    import java.io.IOException;
053    import java.io.Serializable;
054    
055    import java.net.MalformedURLException;
056    import java.net.URLClassLoader;
057    
058    import java.nio.file.Path;
059    import java.nio.file.Paths;
060    
061    import java.util.ArrayList;
062    import java.util.HashMap;
063    import java.util.LinkedHashMap;
064    import java.util.List;
065    import java.util.Map;
066    import java.util.concurrent.Callable;
067    import java.util.concurrent.ExecutionException;
068    import java.util.concurrent.Future;
069    
070    /**
071     * @author Shuyang Zhou
072     */
073    public class NettyFabricWorkerExecutionChannelHandler
074            extends SimpleChannelInboundHandler<NettyFabricWorkerConfig<Serializable>> {
075    
076            public NettyFabricWorkerExecutionChannelHandler(
077                    Repository<Channel> repository, FabricAgent fabricAgent,
078                    long executionTimeout) {
079    
080                    if (repository == null) {
081                            throw new NullPointerException("Repository is null");
082                    }
083    
084                    if (fabricAgent == null) {
085                            throw new NullPointerException("Fabric agent is null");
086                    }
087    
088                    _repository = repository;
089                    _fabricAgent = fabricAgent;
090                    _executionTimeout = executionTimeout;
091            }
092    
093            @Override
094            public void exceptionCaught(
095                    ChannelHandlerContext channelHandlerContext, Throwable throwable) {
096    
097                    final Channel channel = channelHandlerContext.channel();
098    
099                    _log.error("Closing " + channel + " due to:", throwable);
100    
101                    ChannelFuture channelFuture = channel.close();
102    
103                    channelFuture.addListener(
104                            new ChannelFutureListener() {
105    
106                                    @Override
107                                    public void operationComplete(ChannelFuture channelFuture) {
108                                            if (_log.isInfoEnabled()) {
109                                                    _log.info(channel + " is closed");
110                                            }
111                                    }
112    
113                            });
114            }
115    
116            @Override
117            protected void channelRead0(
118                    ChannelHandlerContext channelHandlerContext,
119                    NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
120    
121                    NoticeableFuture<LoadedPaths> noticeableFuture = loadPaths(
122                            channelHandlerContext.channel(), nettyFabricWorkerConfig);
123    
124                    noticeableFuture.addFutureListener(
125                            new PostLoadPathsFutureListener(
126                                    channelHandlerContext, nettyFabricWorkerConfig));
127            }
128    
129            protected NoticeableFuture<LoadedPaths> loadPaths(
130                    Channel channel,
131                    NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
132    
133                    Map<Path, Path> mergedPaths = new HashMap<>();
134    
135                    ProcessConfig processConfig =
136                            nettyFabricWorkerConfig.getProcessConfig();
137    
138                    final Map<Path, Path> bootstrapPaths = new LinkedHashMap<>();
139    
140                    for (String pathString :
141                                    processConfig.getBootstrapClassPathElements()) {
142    
143                            bootstrapPaths.put(Paths.get(pathString), null);
144                    }
145    
146                    mergedPaths.putAll(bootstrapPaths);
147    
148                    final Map<Path, Path> runtimePaths = new LinkedHashMap<>();
149    
150                    for (String pathString : processConfig.getRuntimeClassPathElements()) {
151                            runtimePaths.put(Paths.get(pathString), null);
152                    }
153    
154                    mergedPaths.putAll(runtimePaths);
155    
156                    final Map<Path, Path> inputPaths =
157                            nettyFabricWorkerConfig.getInputPathMap();
158    
159                    mergedPaths.putAll(inputPaths);
160    
161                    return new NoticeableFutureConverter<LoadedPaths, Map<Path, Path>>(
162                            _repository.getFiles(channel, mergedPaths, false)) {
163    
164                            @Override
165                            protected LoadedPaths convert(Map<Path, Path> mergedPaths)
166                                    throws IOException {
167    
168                                    Map<Path, Path> loadedInputPaths = new HashMap<>();
169    
170                                    List<Path> missedInputPaths = new ArrayList<>();
171    
172                                    for (Path path : inputPaths.keySet()) {
173                                            Path loadedInputPath = mergedPaths.get(path);
174    
175                                            if (loadedInputPath == null) {
176                                                    missedInputPaths.add(path);
177                                            }
178                                            else {
179                                                    loadedInputPaths.put(path, loadedInputPath);
180                                            }
181                                    }
182    
183                                    if (!missedInputPaths.isEmpty()) {
184                                            throw new IOException(
185                                                    "Unable to get input paths: " + missedInputPaths);
186                                    }
187    
188                                    List<Path> loadedBootstrapPaths = new ArrayList<>();
189    
190                                    List<Path> missedBootstrapPaths = new ArrayList<>();
191    
192                                    for (Path path : bootstrapPaths.keySet()) {
193                                            Path loadedBootstrapPath = mergedPaths.get(path);
194    
195                                            if (loadedBootstrapPath == null) {
196                                                    missedBootstrapPaths.add(path);
197                                            }
198                                            else {
199                                                    loadedBootstrapPaths.add(loadedBootstrapPath);
200                                            }
201                                    }
202    
203                                    if (!missedBootstrapPaths.isEmpty() && _log.isWarnEnabled()) {
204                                            _log.warn(
205                                                    "Incomplete bootstrap classpath loaded, missed: " +
206                                                            missedBootstrapPaths);
207                                    }
208    
209                                    List<Path> loadedRuntimePaths = new ArrayList<>();
210    
211                                    List<Path> missedRuntimePaths = new ArrayList<>();
212    
213                                    for (Path path : runtimePaths.keySet()) {
214                                            Path loadedRuntimePath = mergedPaths.get(path);
215    
216                                            if (loadedRuntimePath == null) {
217                                                    missedRuntimePaths.add(path);
218                                            }
219                                            else {
220                                                    loadedRuntimePaths.add(loadedRuntimePath);
221                                            }
222                                    }
223    
224                                    if (!missedRuntimePaths.isEmpty() && _log.isWarnEnabled()) {
225                                            _log.warn(
226                                                    "Incomplete runtime classpath loaded, missed: " +
227                                                            missedRuntimePaths);
228                                    }
229    
230                                    return new LoadedPaths(
231                                            loadedInputPaths,
232                                            StringUtil.merge(loadedBootstrapPaths, File.pathSeparator),
233                                            StringUtil.merge(loadedRuntimePaths, File.pathSeparator));
234                            }
235                    };
236            }
237    
238            protected void sendResult(
239                    Channel channel, long fabricWorkerId, Serializable result,
240                    Throwable t) {
241    
242                    final FabricWorkerResultProcessCallable
243                            fabricWorkerResultProcessCallable =
244                                    new FabricWorkerResultProcessCallable(
245                                            fabricWorkerId, result, t);
246    
247                    NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
248                            channel,
249                            new SyncProcessRPCCallable<Serializable>(
250                                    fabricWorkerResultProcessCallable));
251    
252                    NettyUtil.scheduleCancellation(
253                            channel, noticeableFuture, _executionTimeout);
254    
255                    noticeableFuture.addFutureListener(
256                            new BaseFutureListener<Serializable>() {
257    
258                                    @Override
259                                    public void completeWithException(
260                                            Future<Serializable> future, Throwable throwable) {
261    
262                                            _log.error(
263                                                    "Unable to send back fabric worker result " +
264                                                            fabricWorkerResultProcessCallable,
265                                                    throwable);
266                                    }
267    
268                            });
269            }
270    
271            protected static class FabricAgentFinishStartupProcessCallable
272                    implements ProcessCallable<Serializable> {
273    
274                    @Override
275                    public Serializable call() throws ProcessException {
276                            Channel channel = ChannelThreadLocal.getChannel();
277    
278                            NettyFabricAgentStub nettyStubFabricAgent =
279                                    NettyChannelAttributes.getNettyFabricAgentStub(channel);
280    
281                            if (nettyStubFabricAgent == null) {
282                                    throw new ProcessException(
283                                            "Unable to locate fabric agent on channel " + channel);
284                            }
285    
286                            nettyStubFabricAgent.finishStartup(_id);
287    
288                            return null;
289                    }
290    
291                    protected FabricAgentFinishStartupProcessCallable(long id) {
292                            _id = id;
293                    }
294    
295                    private static final long serialVersionUID = 1L;
296    
297                    private final long _id;
298    
299            }
300    
301            protected static class FabricWorkerResultProcessCallable
302                    implements ProcessCallable<Serializable> {
303    
304                    @Override
305                    public Serializable call() throws ProcessException {
306                            Channel channel = ChannelThreadLocal.getChannel();
307    
308                            NettyFabricAgentStub nettyStubFabricAgent =
309                                    NettyChannelAttributes.getNettyFabricAgentStub(channel);
310    
311                            if (nettyStubFabricAgent == null) {
312                                    throw new ProcessException(
313                                            "Unable to locate fabric agent on channel " + channel);
314                            }
315    
316                            NettyFabricWorkerStub<Serializable> nettyStubFabricWorker =
317                                    (NettyFabricWorkerStub<Serializable>)
318                                            nettyStubFabricAgent.takeNettyStubFabricWorker(_id);
319    
320                            if (nettyStubFabricWorker == null) {
321                                    throw new ProcessException(
322                                            "Unable to locate fabric worker on channel " + channel +
323                                                    ", with fabric worker id " + _id);
324                            }
325    
326                            if (_throwable != null) {
327                                    nettyStubFabricWorker.setException(_throwable);
328                            }
329                            else {
330                                    nettyStubFabricWorker.setResult(_result);
331                            }
332    
333                            return null;
334                    }
335    
336                    @Override
337                    public String toString() {
338                            StringBundler sb = new StringBundler(7);
339    
340                            sb.append("{id=");
341                            sb.append(_id);
342                            sb.append(", result=");
343                            sb.append(_result);
344                            sb.append(", throwable=");
345                            sb.append(_throwable);
346                            sb.append("}");
347    
348                            return sb.toString();
349                    }
350    
351                    protected FabricWorkerResultProcessCallable(
352                            long id, Serializable result, Throwable throwable) {
353    
354                            _id = id;
355                            _result = result;
356                            _throwable = throwable;
357                    }
358    
359                    private static final long serialVersionUID = 1L;
360    
361                    private final long _id;
362                    private final Serializable _result;
363                    private final Throwable _throwable;
364    
365            }
366    
367            protected static class LoadedPaths {
368    
369                    public LoadedPaths(
370                            Map<Path, Path> inputPaths, String bootstrapClassPath,
371                            String runtimeClassPath) {
372    
373                            _inputPaths = inputPaths;
374                            _bootstrapClassPath = bootstrapClassPath;
375                            _runtimeClassPath = runtimeClassPath;
376                    }
377    
378                    public Map<Path, Path> getInputPaths() {
379                            return _inputPaths;
380                    }
381    
382                    public ProcessConfig toProcessConfig(ProcessConfig processConfig)
383                            throws ProcessException {
384    
385                            Builder builder = new Builder();
386    
387                            builder.setArguments(processConfig.getArguments());
388                            builder.setBootstrapClassPath(_bootstrapClassPath);
389                            builder.setJavaExecutable(processConfig.getJavaExecutable());
390                            builder.setRuntimeClassPath(_runtimeClassPath);
391    
392                            try {
393                                    builder.setReactClassLoader(
394                                            new URLClassLoader(
395                                                    ArrayUtil.append(
396                                                            ClassPathUtil.getClassPathURLs(_bootstrapClassPath),
397                                                            ClassPathUtil.getClassPathURLs(
398                                                                    _runtimeClassPath))));
399                            }
400                            catch (MalformedURLException murle) {
401                                    throw new ProcessException(murle);
402                            }
403    
404                            return builder.build();
405                    }
406    
407                    private final String _bootstrapClassPath;
408                    private final Map<Path, Path> _inputPaths;
409                    private final String _runtimeClassPath;
410    
411            }
412    
413            protected class PostFabricWorkerExecutionFutureListener
414                    implements GenericFutureListener
415                            <io.netty.util.concurrent.Future<FabricWorker<Serializable>>> {
416    
417                    public PostFabricWorkerExecutionFutureListener(
418                            Channel channel, LoadedPaths loadedPaths,
419                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
420    
421                            _channel = channel;
422                            _loadedPaths = loadedPaths;
423                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
424                    }
425    
426                    @Override
427                    public void operationComplete(
428                                    io.netty.util.concurrent.Future<FabricWorker<Serializable>>
429                                            future)
430                            throws Exception {
431    
432                            Throwable throwable = future.cause();
433    
434                            if (throwable != null) {
435                                    sendResult(
436                                            _channel, _nettyFabricWorkerConfig.getId(), null,
437                                            throwable);
438    
439                                    return;
440                            }
441    
442                            FabricWorker<Serializable> fabricWorker = future.get();
443    
444                            NettyChannelAttributes.putFabricWorker(
445                                    _channel, _nettyFabricWorkerConfig.getId(), fabricWorker);
446    
447                            NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
448                                    _channel,
449                                    new SyncProcessRPCCallable<Serializable>(
450                                            new FabricAgentFinishStartupProcessCallable(
451                                                    _nettyFabricWorkerConfig.getId())));
452    
453                            NettyUtil.scheduleCancellation(
454                                    _channel, noticeableFuture, _executionTimeout);
455    
456                            noticeableFuture.addFutureListener(
457                                    new BaseFutureListener<Serializable>() {
458    
459                                            @Override
460                                            public void completeWithException(
461                                                    Future<Serializable> future, Throwable throwable) {
462    
463                                                    _log.error(
464                                                            "Unable to finish fabric worker startup",
465                                                            throwable);
466                                            }
467    
468                                    });
469    
470                            NoticeableFuture<Serializable> processNoticeableFuture =
471                                    fabricWorker.getProcessNoticeableFuture();
472    
473                            processNoticeableFuture.addFutureListener(
474                                    new PostFabricWorkerFinishFutureListener(
475                                            _channel, _nettyFabricWorkerConfig, _loadedPaths));
476                    }
477    
478                    private final Channel _channel;
479                    private final LoadedPaths _loadedPaths;
480                    private final NettyFabricWorkerConfig<Serializable>
481                            _nettyFabricWorkerConfig;
482    
483            }
484    
485            protected class PostFabricWorkerFinishFutureListener
486                    implements FutureListener<Serializable> {
487    
488                    public PostFabricWorkerFinishFutureListener(
489                            Channel channel,
490                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig,
491                            LoadedPaths loadedPaths) {
492    
493                            _channel = channel;
494                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
495                            _loadedPaths = loadedPaths;
496                    }
497    
498                    @Override
499                    public void complete(Future<Serializable> future) {
500                            Map<Path, Path> inputPaths = _loadedPaths.getInputPaths();
501    
502                            for (Path path : inputPaths.values()) {
503                                    FileHelperUtil.delete(true, path);
504                            }
505    
506                            try {
507                                    sendResult(
508                                            _channel, _nettyFabricWorkerConfig.getId(), future.get(),
509                                            null);
510                            }
511                            catch (Throwable t) {
512                                    if (t instanceof ExecutionException) {
513                                            t = t.getCause();
514                                    }
515    
516                                    sendResult(_channel, _nettyFabricWorkerConfig.getId(), null, t);
517                            }
518                    }
519    
520                    private final Channel _channel;
521                    private final LoadedPaths _loadedPaths;
522                    private final NettyFabricWorkerConfig<Serializable>
523                            _nettyFabricWorkerConfig;
524    
525            }
526    
527            protected class PostLoadPathsFutureListener
528                    extends BaseFutureListener<LoadedPaths> {
529    
530                    public PostLoadPathsFutureListener(
531                            ChannelHandlerContext channelHandlerContext,
532                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
533    
534                            _channelHandlerContext = channelHandlerContext;
535                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
536                    }
537    
538                    @Override
539                    public void completeWithException(
540                            Future<LoadedPaths> future, Throwable throwable) {
541    
542                            sendResult(
543                                    _channelHandlerContext.channel(),
544                                    _nettyFabricWorkerConfig.getId(), null, throwable);
545                    }
546    
547                    @Override
548                    public void completeWithResult(
549                            Future<LoadedPaths> loadPathsFuture,
550                            final LoadedPaths loadedPaths) {
551    
552                            EventExecutor eventExecutor = _channelHandlerContext.executor();
553    
554                            io.netty.util.concurrent.Future<FabricWorker<Serializable>> future =
555                                    eventExecutor.submit(
556                                            new Callable<FabricWorker<Serializable>>() {
557    
558                                                    @Override
559                                                    public FabricWorker<Serializable> call()
560                                                            throws ProcessException {
561    
562                                                            ProcessConfig processConfig =
563                                                                    _nettyFabricWorkerConfig.getProcessConfig();
564    
565                                                            return _fabricAgent.execute(
566                                                                    loadedPaths.toProcessConfig(processConfig),
567                                                                    _nettyFabricWorkerConfig.getProcessCallable());
568                                                    }
569    
570                                            });
571    
572                            future.addListener(
573                                    new PostFabricWorkerExecutionFutureListener(
574                                            _channelHandlerContext.channel(), loadedPaths,
575                                            _nettyFabricWorkerConfig));
576                    }
577    
578                    private final ChannelHandlerContext _channelHandlerContext;
579                    private final NettyFabricWorkerConfig<Serializable>
580                            _nettyFabricWorkerConfig;
581    
582            }
583    
584            private static final Log _log = LogFactoryUtil.getLog(
585                    NettyFabricWorkerExecutionChannelHandler.class);
586    
587            private final long _executionTimeout;
588            private final FabricAgent _fabricAgent;
589            private final Repository<Channel> _repository;
590    
591    }