001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.worker;
016    
017    import com.liferay.portal.fabric.netty.rpc.RPCUtil;
018    import com.liferay.portal.fabric.repository.Repository;
019    import com.liferay.portal.fabric.status.FabricStatus;
020    import com.liferay.portal.fabric.status.RemoteFabricStatus;
021    import com.liferay.portal.fabric.worker.FabricWorker;
022    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
023    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
024    import com.liferay.portal.kernel.concurrent.FutureListener;
025    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
026    import com.liferay.portal.kernel.process.ProcessCallable;
027    
028    import io.netty.channel.Channel;
029    import io.netty.channel.ChannelFuture;
030    import io.netty.channel.ChannelFutureListener;
031    
032    import java.io.Serializable;
033    
034    import java.nio.file.Path;
035    
036    import java.util.Map;
037    import java.util.concurrent.Future;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public class NettyFabricWorkerStub<T extends Serializable>
043            implements FabricWorker<T> {
044    
045            public NettyFabricWorkerStub(
046                    long id, Channel channel, Repository<Channel> repository,
047                    Map<Path, Path> outputPathMap, long rpcRelayTimeout) {
048    
049                    if (channel == null) {
050                            throw new NullPointerException("Channel is null");
051                    }
052    
053                    if (repository == null) {
054                            throw new NullPointerException("Repository is null");
055                    }
056    
057                    if (outputPathMap == null) {
058                            throw new NullPointerException("Output path map is null");
059                    }
060    
061                    _id = id;
062                    _channel = channel;
063                    _repository = repository;
064                    _outputPathMap = outputPathMap;
065                    _rpcRelayTimeout = rpcRelayTimeout;
066    
067                    final ChannelFuture channelFuture = _channel.closeFuture();
068    
069                    final ChannelFutureListener channelCloseListener =
070                            new ChannelFutureListener() {
071    
072                                    @Override
073                                    public void operationComplete(ChannelFuture channelFuture) {
074                                            _defaultNoticeableFuture.cancel(true);
075                                    }
076    
077                            };
078    
079                    channelFuture.addListener(channelCloseListener);
080    
081                    _defaultNoticeableFuture.addFutureListener(
082                            new FutureListener<T>() {
083    
084                                    @Override
085                                    public void complete(Future<T> future) {
086                                            channelFuture.removeListener(channelCloseListener);
087                                    }
088    
089                            });
090            }
091    
092            @Override
093            public FabricStatus getFabricStatus() {
094                    return new RemoteFabricStatus(
095                            new NettyFabricWorkerProcessCallableExecutor(
096                                    _channel, _id, _rpcRelayTimeout));
097            }
098    
099            @Override
100            public NoticeableFuture<T> getProcessNoticeableFuture() {
101                    return _defaultNoticeableFuture;
102            }
103    
104            public void setCancel() {
105                    _defaultNoticeableFuture.cancel(true);
106            }
107    
108            public void setException(Throwable t) {
109                    _defaultNoticeableFuture.setException(t);
110            }
111    
112            public void setResult(final T result) {
113                    NoticeableFuture<Map<Path, Path>> noticeableFuture =
114                            _repository.getFiles(_channel, _outputPathMap, true);
115    
116                    noticeableFuture.addFutureListener(
117                            new BaseFutureListener<Map<Path, Path>>() {
118    
119                                    @Override
120                                    public void completeWithCancel(Future<Map<Path, Path>> future) {
121                                            _defaultNoticeableFuture.cancel(true);
122                                    }
123    
124                                    @Override
125                                    public void completeWithException(
126                                            Future<Map<Path, Path>> future, Throwable throwable) {
127    
128                                            _defaultNoticeableFuture.setException(throwable);
129                                    }
130    
131                                    @Override
132                                    public void completeWithResult(
133                                            Future<Map<Path, Path>> future, Map<Path, Path> map) {
134    
135                                            _defaultNoticeableFuture.set(result);
136                                    }
137    
138                            });
139            }
140    
141            @Override
142            public <V extends Serializable> NoticeableFuture<V> write(
143                    ProcessCallable<V> processCallable) {
144    
145                    return RPCUtil.execute(
146                            _channel,
147                            new NettyFabricWorkerBridgeRPCCallable<V>(
148                                    _id, processCallable, _rpcRelayTimeout));
149            }
150    
151            private final Channel _channel;
152            private final DefaultNoticeableFuture<T> _defaultNoticeableFuture =
153                    new DefaultNoticeableFuture<>();
154            private final long _id;
155            private final Map<Path, Path> _outputPathMap;
156            private final Repository<Channel> _repository;
157            private final long _rpcRelayTimeout;
158    
159    }