001
014
015 package com.liferay.portal.fabric.netty.agent;
016
017 import com.liferay.portal.fabric.FabricPathMappingVisitor;
018 import com.liferay.portal.fabric.InputResource;
019 import com.liferay.portal.fabric.OutputResource;
020 import com.liferay.portal.fabric.agent.FabricAgent;
021 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
022 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
023 import com.liferay.portal.fabric.repository.Repository;
024 import com.liferay.portal.fabric.status.FabricStatus;
025 import com.liferay.portal.fabric.status.RemoteFabricStatus;
026 import com.liferay.portal.fabric.worker.FabricWorker;
027 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
028 import com.liferay.portal.kernel.concurrent.FutureListener;
029 import com.liferay.portal.kernel.process.ProcessCallable;
030 import com.liferay.portal.kernel.process.ProcessConfig;
031 import com.liferay.portal.kernel.util.ObjectGraphUtil;
032
033 import io.netty.channel.Channel;
034 import io.netty.channel.ChannelFuture;
035 import io.netty.channel.ChannelFutureListener;
036
037 import java.io.Serializable;
038
039 import java.nio.file.Path;
040
041 import java.util.Collection;
042 import java.util.Collections;
043 import java.util.Map;
044 import java.util.concurrent.CancellationException;
045 import java.util.concurrent.ConcurrentHashMap;
046 import java.util.concurrent.ExecutionException;
047 import java.util.concurrent.Future;
048 import java.util.concurrent.TimeUnit;
049 import java.util.concurrent.atomic.AtomicLong;
050
051
054 public class NettyFabricAgentStub implements FabricAgent {
055
056 public NettyFabricAgentStub(
057 Channel channel, Repository<Channel> repository,
058 Path remoteRepositoryPath, long rpcRelayTimeout, long startupTimeout) {
059
060 if (channel == null) {
061 throw new NullPointerException("Channel is null");
062 }
063
064 if (repository == null) {
065 throw new NullPointerException("Repository is null");
066 }
067
068 if (remoteRepositoryPath == null) {
069 throw new NullPointerException("Remote repository path is null");
070 }
071
072 _channel = channel;
073 _repository = repository;
074 _remoteRepositoryPath = remoteRepositoryPath;
075 _rpcRelayTimeout = rpcRelayTimeout;
076 _startupTimeout = startupTimeout;
077 }
078
079 @Override
080 public boolean equals(Object obj) {
081 if (this == obj) {
082 return true;
083 }
084
085 if (!(obj instanceof NettyFabricAgentStub)) {
086 return false;
087 }
088
089 NettyFabricAgentStub nettyFabricAgentStub = (NettyFabricAgentStub)obj;
090
091 if (_channel.equals(nettyFabricAgentStub._channel)) {
092 return true;
093 }
094
095 return false;
096 }
097
098 @Override
099 public <T extends Serializable> FabricWorker<T> execute(
100 ProcessConfig processConfig, ProcessCallable<T> processCallable) {
101
102 final long id = _idGenerator.getAndIncrement();
103
104 FabricPathMappingVisitor fabricPathMappingVisitor =
105 new FabricPathMappingVisitor(
106 OutputResource.class, _remoteRepositoryPath, true);
107
108 ObjectGraphUtil.walkObjectGraph(
109 processCallable, fabricPathMappingVisitor);
110
111 NettyFabricWorkerStub<T> nettyFabricWorkerStub =
112 new NettyFabricWorkerStub<T>(
113 id, _channel, _repository,
114 fabricPathMappingVisitor.getPathMap(), _rpcRelayTimeout);
115
116 final DefaultNoticeableFuture<Object> startupNoticeableFuture =
117 new DefaultNoticeableFuture<>();
118
119 _startupNoticeableFutures.put(id, startupNoticeableFuture);
120
121 startupNoticeableFuture.addFutureListener(
122 new FutureListener<Object>() {
123
124 @Override
125 public void complete(Future<Object> future) {
126 _startupNoticeableFutures.remove(id);
127 }
128
129 });
130
131 fabricPathMappingVisitor = new FabricPathMappingVisitor(
132 InputResource.class, _remoteRepositoryPath);
133
134 ObjectGraphUtil.walkObjectGraph(
135 processCallable, fabricPathMappingVisitor);
136
137 ChannelFuture channelFuture = _channel.writeAndFlush(
138 new NettyFabricWorkerConfig<T>(
139 id, processConfig, processCallable,
140 fabricPathMappingVisitor.getPathMap()));
141
142 channelFuture.addListener(
143 new ChannelFutureListener() {
144
145 @Override
146 public void operationComplete(ChannelFuture channelFuture) {
147 if (channelFuture.isSuccess()) {
148 return;
149 }
150
151 if (channelFuture.isCancelled()) {
152 startupNoticeableFuture.cancel(true);
153
154 return;
155 }
156
157 startupNoticeableFuture.setException(channelFuture.cause());
158 }
159
160 });
161
162 final ChannelFutureListener channelFutureListener =
163 new ChannelFutureListener() {
164
165 @Override
166 public void operationComplete(ChannelFuture channelFuture) {
167 startupNoticeableFuture.cancel(true);
168 }
169
170 };
171
172 final ChannelFuture closeChannelFuture = _channel.closeFuture();
173
174 closeChannelFuture.addListener(channelFutureListener);
175
176 startupNoticeableFuture.addFutureListener(
177 new FutureListener<Object>() {
178
179 @Override
180 public void complete(Future<Object> future) {
181 closeChannelFuture.removeListener(channelFutureListener);
182 }
183
184 });
185
186 try {
187 startupNoticeableFuture.get(_startupTimeout, TimeUnit.MILLISECONDS);
188
189 _nettyFabricWorkerStubs.put(id, nettyFabricWorkerStub);
190 }
191 catch (CancellationException ce) {
192 nettyFabricWorkerStub.setCancel();
193 }
194 catch (Throwable t) {
195 if (t instanceof ExecutionException) {
196 t = t.getCause();
197 }
198
199 nettyFabricWorkerStub.setException(t);
200 }
201
202 return nettyFabricWorkerStub;
203 }
204
205 public void finishStartup(long id) {
206 DefaultNoticeableFuture<?> startupNoticeabeFuture =
207 _startupNoticeableFutures.remove(id);
208
209 if (startupNoticeabeFuture != null) {
210 startupNoticeabeFuture.run();
211 }
212 }
213
214 @Override
215 public FabricStatus getFabricStatus() {
216 return new RemoteFabricStatus(
217 new NettyFabricAgentProcessCallableExecutor(_channel));
218 }
219
220 @Override
221 public Collection<? extends FabricWorker<?>> getFabricWorkers() {
222 return Collections.unmodifiableCollection(
223 _nettyFabricWorkerStubs.values());
224 }
225
226 @Override
227 public int hashCode() {
228 return _channel.hashCode();
229 }
230
231 public NettyFabricWorkerStub<?> takeNettyStubFabricWorker(long id) {
232 return _nettyFabricWorkerStubs.remove(id);
233 }
234
235 private final Channel _channel;
236 private final AtomicLong _idGenerator = new AtomicLong();
237 private final Map<Long, NettyFabricWorkerStub<?>>
238 _nettyFabricWorkerStubs = new ConcurrentHashMap<>();
239 private final Path _remoteRepositoryPath;
240 private final Repository<Channel> _repository;
241 private final long _rpcRelayTimeout;
242 private final Map<Long, DefaultNoticeableFuture<?>>
243 _startupNoticeableFutures = new ConcurrentHashMap<>();
244 private final long _startupTimeout;
245
246 }