001
014
015 package com.liferay.portal.fabric.netty.worker;
016
017 import com.liferay.portal.fabric.netty.rpc.RPCUtil;
018 import com.liferay.portal.fabric.repository.Repository;
019 import com.liferay.portal.fabric.status.FabricStatus;
020 import com.liferay.portal.fabric.status.RemoteFabricStatus;
021 import com.liferay.portal.fabric.worker.FabricWorker;
022 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
023 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
024 import com.liferay.portal.kernel.concurrent.FutureListener;
025 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
026 import com.liferay.portal.kernel.process.ProcessCallable;
027
028 import io.netty.channel.Channel;
029 import io.netty.channel.ChannelFuture;
030 import io.netty.channel.ChannelFutureListener;
031
032 import java.io.Serializable;
033
034 import java.nio.file.Path;
035
036 import java.util.Map;
037 import java.util.concurrent.Future;
038
039
042 public class NettyFabricWorkerStub<T extends Serializable>
043 implements FabricWorker<T> {
044
045 public NettyFabricWorkerStub(
046 long id, Channel channel, Repository<Channel> repository,
047 Map<Path, Path> outputPathMap, long rpcRelayTimeout) {
048
049 if (channel == null) {
050 throw new NullPointerException("Channel is null");
051 }
052
053 if (repository == null) {
054 throw new NullPointerException("Repository is null");
055 }
056
057 if (outputPathMap == null) {
058 throw new NullPointerException("Output path map is null");
059 }
060
061 _id = id;
062 _channel = channel;
063 _repository = repository;
064 _outputPathMap = outputPathMap;
065 _rpcRelayTimeout = rpcRelayTimeout;
066
067 final ChannelFuture channelFuture = _channel.closeFuture();
068
069 final ChannelFutureListener channelCloseListener =
070 new ChannelFutureListener() {
071
072 @Override
073 public void operationComplete(ChannelFuture channelFuture) {
074 _defaultNoticeableFuture.cancel(true);
075 }
076
077 };
078
079 channelFuture.addListener(channelCloseListener);
080
081 _defaultNoticeableFuture.addFutureListener(
082 new FutureListener<T>() {
083
084 @Override
085 public void complete(Future<T> future) {
086 channelFuture.removeListener(channelCloseListener);
087 }
088
089 });
090 }
091
092 @Override
093 public FabricStatus getFabricStatus() {
094 return new RemoteFabricStatus(
095 new NettyFabricWorkerProcessCallableExecutor(
096 _channel, _id, _rpcRelayTimeout));
097 }
098
099 @Override
100 public NoticeableFuture<T> getProcessNoticeableFuture() {
101 return _defaultNoticeableFuture;
102 }
103
104 public void setCancel() {
105 _defaultNoticeableFuture.cancel(true);
106 }
107
108 public void setException(Throwable t) {
109 _defaultNoticeableFuture.setException(t);
110 }
111
112 public void setResult(final T result) {
113 NoticeableFuture<Map<Path, Path>> noticeableFuture =
114 _repository.getFiles(_channel, _outputPathMap, true);
115
116 noticeableFuture.addFutureListener(
117 new BaseFutureListener<Map<Path, Path>>() {
118
119 @Override
120 public void completeWithCancel(Future<Map<Path, Path>> future) {
121 _defaultNoticeableFuture.cancel(true);
122 }
123
124 @Override
125 public void completeWithException(
126 Future<Map<Path, Path>> future, Throwable throwable) {
127
128 _defaultNoticeableFuture.setException(throwable);
129 }
130
131 @Override
132 public void completeWithResult(
133 Future<Map<Path, Path>> future, Map<Path, Path> map) {
134
135 _defaultNoticeableFuture.set(result);
136 }
137
138 });
139 }
140
141 @Override
142 public <V extends Serializable> NoticeableFuture<V> write(
143 ProcessCallable<V> processCallable) {
144
145 return RPCUtil.execute(
146 _channel,
147 new NettyFabricWorkerBridgeRPCCallable<V>(
148 _id, processCallable, _rpcRelayTimeout));
149 }
150
151 private final Channel _channel;
152 private final DefaultNoticeableFuture<T> _defaultNoticeableFuture =
153 new DefaultNoticeableFuture<>();
154 private final long _id;
155 private final Map<Path, Path> _outputPathMap;
156 private final Repository<Channel> _repository;
157 private final long _rpcRelayTimeout;
158
159 }