001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.agent;
016    
017    import com.liferay.portal.fabric.FabricPathMappingVisitor;
018    import com.liferay.portal.fabric.InputResource;
019    import com.liferay.portal.fabric.OutputResource;
020    import com.liferay.portal.fabric.agent.FabricAgent;
021    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
022    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
023    import com.liferay.portal.fabric.repository.Repository;
024    import com.liferay.portal.fabric.status.FabricStatus;
025    import com.liferay.portal.fabric.status.RemoteFabricStatus;
026    import com.liferay.portal.fabric.worker.FabricWorker;
027    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
028    import com.liferay.portal.kernel.concurrent.FutureListener;
029    import com.liferay.portal.kernel.process.ProcessCallable;
030    import com.liferay.portal.kernel.process.ProcessConfig;
031    import com.liferay.portal.kernel.util.ObjectGraphUtil;
032    
033    import io.netty.channel.Channel;
034    import io.netty.channel.ChannelFuture;
035    import io.netty.channel.ChannelFutureListener;
036    
037    import java.io.Serializable;
038    
039    import java.nio.file.Path;
040    
041    import java.util.Collection;
042    import java.util.Collections;
043    import java.util.Map;
044    import java.util.concurrent.CancellationException;
045    import java.util.concurrent.ConcurrentHashMap;
046    import java.util.concurrent.ExecutionException;
047    import java.util.concurrent.Future;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.atomic.AtomicLong;
050    
051    /**
052     * @author Shuyang Zhou
053     */
054    public class NettyFabricAgentStub implements FabricAgent {
055    
056            public NettyFabricAgentStub(
057                    Channel channel, Repository<Channel> repository,
058                    Path remoteRepositoryPath, long rpcRelayTimeout, long startupTimeout) {
059    
060                    if (channel == null) {
061                            throw new NullPointerException("Channel is null");
062                    }
063    
064                    if (repository == null) {
065                            throw new NullPointerException("Repository is null");
066                    }
067    
068                    if (remoteRepositoryPath == null) {
069                            throw new NullPointerException("Remote repository path is null");
070                    }
071    
072                    _channel = channel;
073                    _repository = repository;
074                    _remoteRepositoryPath = remoteRepositoryPath;
075                    _rpcRelayTimeout = rpcRelayTimeout;
076                    _startupTimeout = startupTimeout;
077            }
078    
079            @Override
080            public boolean equals(Object obj) {
081                    if (this == obj) {
082                            return true;
083                    }
084    
085                    if (!(obj instanceof NettyFabricAgentStub)) {
086                            return false;
087                    }
088    
089                    NettyFabricAgentStub nettyFabricAgentStub = (NettyFabricAgentStub)obj;
090    
091                    if (_channel.equals(nettyFabricAgentStub._channel)) {
092                            return true;
093                    }
094    
095                    return false;
096            }
097    
098            @Override
099            public <T extends Serializable> FabricWorker<T> execute(
100                    ProcessConfig processConfig, ProcessCallable<T> processCallable) {
101    
102                    final long id = _idGenerator.getAndIncrement();
103    
104                    FabricPathMappingVisitor fabricPathMappingVisitor =
105                            new FabricPathMappingVisitor(
106                                    OutputResource.class, _remoteRepositoryPath, true);
107    
108                    ObjectGraphUtil.walkObjectGraph(
109                            processCallable, fabricPathMappingVisitor);
110    
111                    NettyFabricWorkerStub<T> nettyFabricWorkerStub =
112                            new NettyFabricWorkerStub<T>(
113                                    id, _channel, _repository,
114                                    fabricPathMappingVisitor.getPathMap(), _rpcRelayTimeout);
115    
116                    final DefaultNoticeableFuture<Object> startupNoticeableFuture =
117                            new DefaultNoticeableFuture<>();
118    
119                    _startupNoticeableFutures.put(id, startupNoticeableFuture);
120    
121                    startupNoticeableFuture.addFutureListener(
122                            new FutureListener<Object>() {
123    
124                                    @Override
125                                    public void complete(Future<Object> future) {
126                                            _startupNoticeableFutures.remove(id);
127                                    }
128    
129                            });
130    
131                    fabricPathMappingVisitor = new FabricPathMappingVisitor(
132                            InputResource.class, _remoteRepositoryPath);
133    
134                    ObjectGraphUtil.walkObjectGraph(
135                            processCallable, fabricPathMappingVisitor);
136    
137                    ChannelFuture channelFuture = _channel.writeAndFlush(
138                            new NettyFabricWorkerConfig<T>(
139                                    id, processConfig, processCallable,
140                                    fabricPathMappingVisitor.getPathMap()));
141    
142                    channelFuture.addListener(
143                            new ChannelFutureListener() {
144    
145                                    @Override
146                                    public void operationComplete(ChannelFuture channelFuture) {
147                                            if (channelFuture.isSuccess()) {
148                                                    return;
149                                            }
150    
151                                            if (channelFuture.isCancelled()) {
152                                                    startupNoticeableFuture.cancel(true);
153    
154                                                    return;
155                                            }
156    
157                                            startupNoticeableFuture.setException(channelFuture.cause());
158                                    }
159    
160                            });
161    
162                    final ChannelFutureListener channelFutureListener =
163                            new ChannelFutureListener() {
164    
165                                    @Override
166                                    public void operationComplete(ChannelFuture channelFuture) {
167                                            startupNoticeableFuture.cancel(true);
168                                    }
169    
170                            };
171    
172                    final ChannelFuture closeChannelFuture = _channel.closeFuture();
173    
174                    closeChannelFuture.addListener(channelFutureListener);
175    
176                    startupNoticeableFuture.addFutureListener(
177                            new FutureListener<Object>() {
178    
179                                    @Override
180                                    public void complete(Future<Object> future) {
181                                            closeChannelFuture.removeListener(channelFutureListener);
182                                    }
183    
184                            });
185    
186                    try {
187                            startupNoticeableFuture.get(_startupTimeout, TimeUnit.MILLISECONDS);
188    
189                            _nettyFabricWorkerStubs.put(id, nettyFabricWorkerStub);
190                    }
191                    catch (CancellationException ce) {
192                            nettyFabricWorkerStub.setCancel();
193                    }
194                    catch (Throwable t) {
195                            if (t instanceof ExecutionException) {
196                                    t = t.getCause();
197                            }
198    
199                            nettyFabricWorkerStub.setException(t);
200                    }
201    
202                    return nettyFabricWorkerStub;
203            }
204    
205            public void finishStartup(long id) {
206                    DefaultNoticeableFuture<?> startupNoticeabeFuture =
207                            _startupNoticeableFutures.remove(id);
208    
209                    if (startupNoticeabeFuture != null) {
210                            startupNoticeabeFuture.run();
211                    }
212            }
213    
214            @Override
215            public FabricStatus getFabricStatus() {
216                    return new RemoteFabricStatus(
217                            new NettyFabricAgentProcessCallableExecutor(_channel));
218            }
219    
220            @Override
221            public Collection<? extends FabricWorker<?>> getFabricWorkers() {
222                    return Collections.unmodifiableCollection(
223                            _nettyFabricWorkerStubs.values());
224            }
225    
226            @Override
227            public int hashCode() {
228                    return _channel.hashCode();
229            }
230    
231            public NettyFabricWorkerStub<?> takeNettyStubFabricWorker(long id) {
232                    return _nettyFabricWorkerStubs.remove(id);
233            }
234    
235            private final Channel _channel;
236            private final AtomicLong _idGenerator = new AtomicLong();
237            private final Map<Long, NettyFabricWorkerStub<?>>
238                    _nettyFabricWorkerStubs = new ConcurrentHashMap<>();
239            private final Path _remoteRepositoryPath;
240            private final Repository<Channel> _repository;
241            private final long _rpcRelayTimeout;
242            private final Map<Long, DefaultNoticeableFuture<?>>
243                    _startupNoticeableFutures = new ConcurrentHashMap<>();
244            private final long _startupTimeout;
245    
246    }