001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.io.BigEndianCodec;
018 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019 import com.liferay.portal.kernel.util.StringBundler;
020 import com.liferay.portal.kernel.util.StringPool;
021
022 import java.io.EOFException;
023 import java.io.IOException;
024
025 import java.nio.ByteBuffer;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028
029 import java.util.EnumSet;
030
031
034 public class Datagram {
035
036 public static Datagram createRequestDatagram(byte type, byte[] data) {
037 return createRequestDatagram(type, ByteBuffer.wrap(data));
038 }
039
040 public static Datagram createRequestDatagram(
041 byte type, ByteBuffer dataByteBuffer) {
042
043 Datagram datagram = new Datagram();
044
045
046
047 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
048
049
050
051
052
053 datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
054
055
056
057 BigEndianCodec.putInt(
058 datagram._headerBufferArray, _INDEX_DATA_SIZE,
059 dataByteBuffer.remaining());
060
061
062
063 datagram._dataByteBuffer = dataByteBuffer;
064
065 return datagram;
066 }
067
068 public static Datagram createResponseDatagram(
069 Datagram requestDatagram, byte[] data) {
070
071 return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
072 }
073
074 public static Datagram createResponseDatagram(
075 Datagram requestDatagram, ByteBuffer byteBuffer) {
076
077 Datagram datagram = new Datagram();
078
079
080
081 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
082
083
084
085 BigEndianCodec.putLong(
086 datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
087 requestDatagram.getSequenceId());
088
089
090
091
092
093 BigEndianCodec.putInt(
094 datagram._headerBufferArray, _INDEX_DATA_SIZE,
095 byteBuffer.remaining());
096
097
098
099 datagram._dataByteBuffer = byteBuffer;
100
101 return datagram;
102 }
103
104 public ByteBuffer getDataByteBuffer() {
105 return _dataByteBuffer;
106 }
107
108 public byte getType() {
109 return _headerBufferArray[_INDEX_DATA_TYPE];
110 }
111
112 @Override
113 public String toString() {
114 StringBundler sb = new StringBundler(11);
115
116 sb.append("{dataChunk=");
117
118 ByteBuffer byteBuffer = _dataByteBuffer;
119
120 if (byteBuffer == null) {
121 sb.append(StringPool.NULL);
122 }
123 else {
124 sb.append(byteBuffer.toString());
125 }
126
127 sb.append(", dataSize=");
128 sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
129 sb.append(", dataType=");
130 sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
131 sb.append(", sequenceId=");
132 sb.append(
133 BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
134 sb.append(", statusFlag=");
135 sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
136 sb.append("}");
137
138 return sb.toString();
139 }
140
141 protected static Datagram createACKResponseDatagram(long sequenceId) {
142 Datagram datagram = new Datagram();
143
144
145
146 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
147
148
149
150 BigEndianCodec.putLong(
151 datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
152
153
154
155
156
157 BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
158
159
160
161 datagram._dataByteBuffer = _EMPTY_BUFFER;
162
163 return datagram;
164 }
165
166 protected static Datagram createReceiveDatagram() {
167 return new Datagram();
168 }
169
170 protected long getSequenceId() {
171 return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
172 }
173
174 protected boolean isAckRequest() {
175 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
176
177 if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
178 return true;
179 }
180 else {
181 return false;
182 }
183 }
184
185 protected boolean isAckResponse() {
186 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
187
188 if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
189 return true;
190 }
191 else {
192 return false;
193 }
194 }
195
196 protected boolean isRequest() {
197 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
198
199 if ((statusFlag & _FLAG_REQUEST) != 0) {
200 return true;
201 }
202 else {
203 return false;
204 }
205 }
206
207 protected boolean isResponse() {
208 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
209
210 if ((statusFlag & _FLAG_RESPONSE) != 0) {
211 return true;
212 }
213 else {
214 return false;
215 }
216 }
217
218 protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
219 throws IOException {
220
221 if (_headerByteBuffer.hasRemaining()) {
222 if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
223 throw new EOFException();
224 }
225
226 if (_headerByteBuffer.hasRemaining()) {
227 return false;
228 }
229
230 int dataSize = BigEndianCodec.getInt(
231 _headerBufferArray, _INDEX_DATA_SIZE);
232
233 if (dataSize == 0) {
234 _dataByteBuffer = _EMPTY_BUFFER;
235
236 return true;
237 }
238
239 _dataByteBuffer = ByteBuffer.allocate(dataSize);
240 }
241
242 if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
243 throw new EOFException();
244 }
245
246 if (_dataByteBuffer.hasRemaining()) {
247 return false;
248 }
249
250 _dataByteBuffer.flip();
251
252 return true;
253 }
254
255 protected void setAckRequest(boolean ackRequest) {
256 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
257
258 if (ackRequest) {
259 statusFlag |= _FLAG_ACK_REQUEST;
260 }
261 else {
262 statusFlag &= ~_FLAG_ACK_REQUEST;
263 }
264
265 _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
266 }
267
268 protected void setSequenceId(long sequenceId) {
269 BigEndianCodec.putLong(
270 _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
271 }
272
273 protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
274 throws IOException {
275
276 if (_headerByteBuffer.hasRemaining()) {
277 ByteBuffer[] byteBuffers = new ByteBuffer[2];
278
279 byteBuffers[0] = _headerByteBuffer;
280 byteBuffers[1] = _dataByteBuffer;
281
282 gatheringByteChannel.write(byteBuffers);
283 }
284 else {
285 gatheringByteChannel.write(_dataByteBuffer);
286 }
287
288 if (_dataByteBuffer.hasRemaining()) {
289 return false;
290 }
291
292 _dataByteBuffer = null;
293
294 return true;
295 }
296
297 protected Object attachment;
298 protected CompletionHandler<Object> completionHandler;
299 protected EnumSet<CompletionType> completionTypes;
300 protected long expireTime;
301 protected long timeout;
302
303 private Datagram() {
304 _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
305
306
307
308
309 _headerBufferArray = _headerByteBuffer.array();
310 }
311
312 private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
313
314 private static final byte _FLAG_ACK_REQUEST = 1;
315
316 private static final byte _FLAG_ACK_RESPONSE = 2;
317
318 private static final byte _FLAG_REQUEST = 4;
319
320 private static final byte _FLAG_RESPONSE = 8;
321
322 private static final int _HEADER_SIZE = 14;
323
324 private static final int _INDEX_DATA_SIZE = 10;
325
326 private static final int _INDEX_DATA_TYPE = 9;
327
328 private static final int _INDEX_SEQUENCE_ID = 1;
329
330 private static final int _INDEX_STATUS_FLAG = 0;
331
332 private ByteBuffer _dataByteBuffer;
333 private final byte[] _headerBufferArray;
334 private final ByteBuffer _headerByteBuffer;
335
336 }