001
014
015 package com.liferay.portal.fabric.netty.worker;
016
017 import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
018 import com.liferay.portal.fabric.netty.rpc.ChannelThreadLocal;
019 import com.liferay.portal.fabric.netty.rpc.RPCCallable;
020 import com.liferay.portal.fabric.netty.util.NettyUtil;
021 import com.liferay.portal.fabric.worker.FabricWorker;
022 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
023 import com.liferay.portal.kernel.process.ProcessCallable;
024 import com.liferay.portal.kernel.process.ProcessException;
025
026 import io.netty.channel.Channel;
027
028 import java.io.Serializable;
029
030
033 public class NettyFabricWorkerBridgeRPCCallable<T extends Serializable>
034 implements RPCCallable<T> {
035
036 public NettyFabricWorkerBridgeRPCCallable(
037 long id, ProcessCallable<T> processCallable, long rpcRelayTime) {
038
039 _id = id;
040 _processCallable = processCallable;
041 _rpcRelayTimeout = rpcRelayTime;
042 }
043
044 @Override
045 public NoticeableFuture<T> call() throws ProcessException {
046 Channel channel = ChannelThreadLocal.getChannel();
047
048 FabricWorker<T> fabricWorker = NettyChannelAttributes.getFabricWorker(
049 channel, _id);
050
051 if (fabricWorker == null) {
052 throw new ProcessException(
053 "Unable to locate fabric worker with ID " + _id);
054 }
055
056 NoticeableFuture<T> noticeableFuture = fabricWorker.write(
057 _processCallable);
058
059 NettyUtil.scheduleCancellation(
060 channel, noticeableFuture, _rpcRelayTimeout);
061
062 return noticeableFuture;
063 }
064
065 private static final long serialVersionUID = 1L;
066
067 private final long _id;
068 private final ProcessCallable<T> _processCallable;
069 private final long _rpcRelayTimeout;
070
071 }