001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.io.BigEndianCodec;
018 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019 import com.liferay.portal.kernel.util.StringBundler;
020 import com.liferay.portal.kernel.util.StringPool;
021
022 import java.io.EOFException;
023 import java.io.IOException;
024
025 import java.nio.ByteBuffer;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028
029 import java.util.EnumSet;
030
031
034 public class Datagram {
035
036 public static Datagram createRequestDatagram(byte type, byte[] data) {
037 return createRequestDatagram(type, ByteBuffer.wrap(data));
038 }
039
040 public static Datagram createRequestDatagram(
041 byte type, ByteBuffer dataByteBuffer) {
042
043 Datagram datagram = new Datagram();
044
045
046
047 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
048
049
050
051
052
053 datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
054
055
056
057 BigEndianCodec.putInt(
058 datagram._headerBufferArray, _INDEX_DATA_SIZE,
059 dataByteBuffer.remaining());
060
061
062
063 datagram._dataByteBuffer = dataByteBuffer;
064
065 return datagram;
066 }
067
068 public static Datagram createResponseDatagram(
069 Datagram requestDatagram, byte[] data) {
070
071 return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
072 }
073
074 public static Datagram createResponseDatagram(
075 Datagram requestDatagram, ByteBuffer byteBuffer) {
076
077 Datagram datagram = new Datagram();
078
079
080
081 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
082
083
084
085 BigEndianCodec.putLong(
086 datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
087 requestDatagram.getSequenceId());
088
089
090
091
092
093 BigEndianCodec.putInt(
094 datagram._headerBufferArray, _INDEX_DATA_SIZE,
095 byteBuffer.remaining());
096
097
098
099 datagram._dataByteBuffer = byteBuffer;
100
101 return datagram;
102 }
103
104 public ByteBuffer getDataByteBuffer() {
105 return _dataByteBuffer;
106 }
107
108 public byte getType() {
109 return _headerBufferArray[_INDEX_DATA_TYPE];
110 }
111
112 @Override
113 public String toString() {
114 StringBundler sb = new StringBundler(11);
115
116 sb.append("{dataChunk=");
117
118 ByteBuffer byteBuffer = _dataByteBuffer;
119
120 if (byteBuffer == null) {
121 sb.append(StringPool.NULL);
122 }
123 else {
124 sb.append(byteBuffer.toString());
125 }
126
127 sb.append(", dataSize=");
128 sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
129 sb.append(", dataType=");
130 sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
131 sb.append(", sequenceId=");
132 sb.append(
133 BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
134 sb.append(", statusFlag=");
135 sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
136 sb.append("}");
137
138 return sb.toString();
139 }
140
141 protected static Datagram createACKResponseDatagram(long sequenceId) {
142 Datagram datagram = new Datagram();
143
144
145
146 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
147
148
149
150 BigEndianCodec.putLong(
151 datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
152
153
154
155
156
157 BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
158
159
160
161 datagram._dataByteBuffer = _EMPTY_BUFFER;
162
163 return datagram;
164 }
165
166 protected static Datagram createReceiveDatagram() {
167 return new Datagram();
168 }
169
170 protected long getSequenceId() {
171 return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
172 }
173
174 protected boolean isAckRequest() {
175 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
176
177 if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
178 return true;
179 }
180 else {
181 return false;
182 }
183 }
184
185 protected boolean isAckResponse() {
186 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
187
188 if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
189 return true;
190 }
191 else {
192 return false;
193 }
194 }
195
196 protected boolean isRequest() {
197 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
198
199 if ((statusFlag & _FLAG_REQUEST) != 0) {
200 return true;
201 }
202 else {
203 return false;
204 }
205 }
206
207 protected boolean isResponse() {
208 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
209
210 if ((statusFlag & _FLAG_RESPONSE) != 0) {
211 return true;
212 }
213 else {
214 return false;
215 }
216 }
217
218 protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
219 throws IOException {
220
221 if (_headerByteBuffer.hasRemaining()) {
222 if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
223 throw new EOFException();
224 }
225
226 if (_headerByteBuffer.hasRemaining()) {
227 return false;
228 }
229 else {
230 int dataSize = BigEndianCodec.getInt(
231 _headerBufferArray, _INDEX_DATA_SIZE);
232
233 if (dataSize == 0) {
234 _dataByteBuffer = _EMPTY_BUFFER;
235
236 return true;
237 }
238 else {
239 _dataByteBuffer = ByteBuffer.allocate(dataSize);
240 }
241 }
242 }
243
244 if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
245 throw new EOFException();
246 }
247
248 if (_dataByteBuffer.hasRemaining()) {
249 return false;
250 }
251 else {
252 _dataByteBuffer.flip();
253
254 return true;
255 }
256 }
257
258 protected void setAckRequest(boolean ackRequest) {
259 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
260
261 if (ackRequest) {
262 statusFlag |= _FLAG_ACK_REQUEST;
263 }
264 else {
265 statusFlag &= ~_FLAG_ACK_REQUEST;
266 }
267
268 _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
269 }
270
271 protected void setSequenceId(long sequenceId) {
272 BigEndianCodec.putLong(
273 _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
274 }
275
276 protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
277 throws IOException {
278
279 if (_headerByteBuffer.hasRemaining()) {
280 ByteBuffer[] byteBuffers = new ByteBuffer[2];
281
282 byteBuffers[0] = _headerByteBuffer;
283 byteBuffers[1] = _dataByteBuffer;
284
285 gatheringByteChannel.write(byteBuffers);
286 }
287 else {
288 gatheringByteChannel.write(_dataByteBuffer);
289 }
290
291 if (_dataByteBuffer.hasRemaining()) {
292 return false;
293 }
294 else {
295 _dataByteBuffer = null;
296
297 return true;
298 }
299 }
300
301 protected Object attachment;
302 protected CompletionHandler<Object> completionHandler;
303 protected EnumSet<CompletionType> completionTypes;
304 protected long expireTime;
305 protected long timeout;
306
307 private Datagram() {
308 _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
309
310
311
312
313 _headerBufferArray = _headerByteBuffer.array();
314 }
315
316 private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
317
318 private static final byte _FLAG_ACK_REQUEST = 1;
319
320 private static final byte _FLAG_ACK_RESPONSE = 2;
321
322 private static final byte _FLAG_REQUEST = 4;
323
324 private static final byte _FLAG_RESPONSE = 8;
325
326 private static final int _HEADER_SIZE = 14;
327
328 private static final int _INDEX_DATA_SIZE = 10;
329
330 private static final int _INDEX_DATA_TYPE = 9;
331
332 private static final int _INDEX_SEQUENCE_ID = 1;
333
334 private static final int _INDEX_STATUS_FLAG = 0;
335
336 private ByteBuffer _dataByteBuffer;
337 private final byte[] _headerBufferArray;
338 private final ByteBuffer _headerByteBuffer;
339
340 }