001
014
015 package com.liferay.portal.fabric.netty.handlers;
016
017 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
018 import com.liferay.portal.fabric.netty.rpc.RPCUtil;
019 import com.liferay.portal.fabric.worker.FabricWorker;
020 import com.liferay.portal.kernel.concurrent.AsyncBroker;
021 import com.liferay.portal.kernel.concurrent.FutureListener;
022 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
023
024 import io.netty.channel.Channel;
025 import io.netty.util.Attribute;
026 import io.netty.util.AttributeKey;
027
028 import java.io.Serializable;
029
030 import java.util.Map;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.Future;
033 import java.util.concurrent.atomic.AtomicLong;
034
035
038 public class NettyChannelAttributes {
039
040 public static <T extends Serializable> AsyncBroker<Long, T> getAsyncBroker(
041 Channel channel) {
042
043 Attribute<AsyncBroker<Long, Serializable>> attribute = channel.attr(
044 _asyncBrokerKey);
045
046 AsyncBroker<Long, Serializable> asyncBroker = attribute.get();
047
048 if (asyncBroker == null) {
049 asyncBroker = new AsyncBroker<>();
050
051 AsyncBroker<Long, Serializable> previousAsyncBroker =
052 attribute.setIfAbsent(asyncBroker);
053
054 if (previousAsyncBroker != null) {
055 asyncBroker = previousAsyncBroker;
056 }
057 }
058
059 return (AsyncBroker<Long, T>)asyncBroker;
060 }
061
062 public static <T extends Serializable> FabricWorker<T> getFabricWorker(
063 Channel channel, long id) {
064
065 Map<Long, FabricWorker<?>> fabricWorkers = getFabricWorkers(channel);
066
067 if (fabricWorkers == null) {
068 return null;
069 }
070
071 return (FabricWorker<T>)fabricWorkers.get(id);
072 }
073
074 public static Map<Long, FabricWorker<?>> getFabricWorkers(Channel channel) {
075 Attribute<Map<Long, FabricWorker<?>>> attribute = channel.attr(
076 _fabricWorkersKey);
077
078 return attribute.get();
079 }
080
081 public static NettyFabricAgentStub getNettyFabricAgentStub(
082 Channel channel) {
083
084 Attribute<NettyFabricAgentStub> attribute = channel.attr(
085 _nettyFabricAgentStubKey);
086
087 return attribute.get();
088 }
089
090 public static long nextId(Channel channel) {
091 Attribute<AtomicLong> attribute = channel.attr(_idGeneratorKey);
092
093 AtomicLong attachmentIdGenerator = attribute.get();
094
095 if (attachmentIdGenerator == null) {
096 attachmentIdGenerator = new AtomicLong();
097
098 AtomicLong previousAttachmentIdGenerator = attribute.setIfAbsent(
099 attachmentIdGenerator);
100
101 if (previousAttachmentIdGenerator != null) {
102 attachmentIdGenerator = previousAttachmentIdGenerator;
103 }
104 }
105
106 return attachmentIdGenerator.getAndIncrement();
107 }
108
109 public static <T extends Serializable> void putFabricWorker(
110 Channel channel, final long id, FabricWorker<T> fabricWorker) {
111
112 Attribute<Map<Long, FabricWorker<?>>> attribute = channel.attr(
113 _fabricWorkersKey);
114
115 Map<Long, FabricWorker<?>> fabricWorkers = attribute.get();
116
117 if (fabricWorkers == null) {
118 fabricWorkers = new ConcurrentHashMap<>();
119
120 Map<Long, FabricWorker<?>> previousFabricWorkers =
121 attribute.setIfAbsent(fabricWorkers);
122
123 if (previousFabricWorkers != null) {
124 fabricWorkers = previousFabricWorkers;
125 }
126 }
127
128 fabricWorkers.put(id, fabricWorker);
129
130 NoticeableFuture<T> noticeableFuture =
131 fabricWorker.getProcessNoticeableFuture();
132
133 final Map<Long, FabricWorker<?>> fabricWorkersRef = fabricWorkers;
134
135 noticeableFuture.addFutureListener(
136 new FutureListener<T>() {
137
138 @Override
139 public void complete(Future<T> future) {
140 fabricWorkersRef.remove(id);
141 }
142
143 });
144 }
145
146 public static void setNettyFabricAgentStub(
147 Channel channel, NettyFabricAgentStub nettyFabricAgentStub) {
148
149 Attribute<NettyFabricAgentStub> attribute = channel.attr(
150 _nettyFabricAgentStubKey);
151
152 attribute.set(nettyFabricAgentStub);
153 }
154
155 private static final AttributeKey<AsyncBroker<Long, Serializable>>
156 _asyncBrokerKey = AttributeKey.valueOf(
157 RPCUtil.class.getName() + "-AsyncBroker");
158 private static final AttributeKey<Map<Long, FabricWorker<?>>>
159 _fabricWorkersKey = AttributeKey.valueOf(
160 NettyChannelAttributes.class.getName() + "-FabricWorkers");
161 private static final AttributeKey<AtomicLong> _idGeneratorKey =
162 AttributeKey.valueOf(RPCUtil.class.getName() + "-IdGenerator");
163 private static final AttributeKey<NettyFabricAgentStub>
164 _nettyFabricAgentStubKey = AttributeKey.valueOf(
165 NettyFabricAgentStub.class.getName());
166
167 }