001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.io.BigEndianCodec;
018    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019    import com.liferay.portal.kernel.util.StringBundler;
020    import com.liferay.portal.kernel.util.StringPool;
021    
022    import java.io.EOFException;
023    import java.io.IOException;
024    
025    import java.nio.ByteBuffer;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    
029    import java.util.EnumSet;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class Datagram {
035    
036            public static Datagram createRequestDatagram(byte type, byte[] data) {
037                    return createRequestDatagram(type, ByteBuffer.wrap(data));
038            }
039    
040            public static Datagram createRequestDatagram(
041                    byte type, ByteBuffer dataByteBuffer) {
042    
043                    Datagram datagram = new Datagram();
044    
045                    // Status flag
046    
047                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
048    
049                    // Request datagram does not set the sequence ID
050    
051                    // Data type
052    
053                    datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
054    
055                    // Data size
056    
057                    BigEndianCodec.putInt(
058                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
059                            dataByteBuffer.remaining());
060    
061                    // Data chunk
062    
063                    datagram._dataByteBuffer = dataByteBuffer;
064    
065                    return datagram;
066            }
067    
068            public static Datagram createResponseDatagram(
069                    Datagram requestDatagram, byte[] data) {
070    
071                    return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
072            }
073    
074            public static Datagram createResponseDatagram(
075                    Datagram requestDatagram, ByteBuffer byteBuffer) {
076    
077                    Datagram datagram = new Datagram();
078    
079                    // Status flag
080    
081                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
082    
083                    // Sequence ID
084    
085                    BigEndianCodec.putLong(
086                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
087                            requestDatagram.getSequenceId());
088    
089                    // Response datagram does not set the data type
090    
091                    // Data size
092    
093                    BigEndianCodec.putInt(
094                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
095                            byteBuffer.remaining());
096    
097                    // Data chunk
098    
099                    datagram._dataByteBuffer = byteBuffer;
100    
101                    return datagram;
102            }
103    
104            public ByteBuffer getDataByteBuffer() {
105                    return _dataByteBuffer;
106            }
107    
108            public byte getType() {
109                    return _headerBufferArray[_INDEX_DATA_TYPE];
110            }
111    
112            @Override
113            public String toString() {
114                    StringBundler sb = new StringBundler(11);
115    
116                    sb.append("{dataChunk=");
117    
118                    ByteBuffer byteBuffer = _dataByteBuffer;
119    
120                    if (byteBuffer == null) {
121                            sb.append(StringPool.NULL);
122                    }
123                    else {
124                            sb.append(byteBuffer.toString());
125                    }
126    
127                    sb.append(", dataSize=");
128                    sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
129                    sb.append(", dataType=");
130                    sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
131                    sb.append(", sequenceId=");
132                    sb.append(
133                            BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
134                    sb.append(", statusFlag=");
135                    sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
136                    sb.append("}");
137    
138                    return sb.toString();
139            }
140    
141            protected static Datagram createACKResponseDatagram(long sequenceId) {
142                    Datagram datagram = new Datagram();
143    
144                    // Status flag
145    
146                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
147    
148                    // Sequence ID
149    
150                    BigEndianCodec.putLong(
151                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
152    
153                    // ACK response datagram does not set the data type
154    
155                    // Data size
156    
157                    BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
158    
159                    // Data chunk
160    
161                    datagram._dataByteBuffer = _EMPTY_BUFFER;
162    
163                    return datagram;
164            }
165    
166            protected static Datagram createReceiveDatagram() {
167                    return new Datagram();
168            }
169    
170            protected long getSequenceId() {
171                    return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
172            }
173    
174            protected boolean isAckRequest() {
175                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
176    
177                    if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
178                            return true;
179                    }
180                    else {
181                            return false;
182                    }
183            }
184    
185            protected boolean isAckResponse() {
186                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
187    
188                    if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
189                            return true;
190                    }
191                    else {
192                            return false;
193                    }
194            }
195    
196            protected boolean isRequest() {
197                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
198    
199                    if ((statusFlag & _FLAG_REQUEST) != 0) {
200                            return true;
201                    }
202                    else {
203                            return false;
204                    }
205            }
206    
207            protected boolean isResponse() {
208                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
209    
210                    if ((statusFlag & _FLAG_RESPONSE) != 0) {
211                            return true;
212                    }
213                    else {
214                            return false;
215                    }
216            }
217    
218            protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
219                    throws IOException {
220    
221                    if (_headerByteBuffer.hasRemaining()) {
222                            if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
223                                    throw new EOFException();
224                            }
225    
226                            if (_headerByteBuffer.hasRemaining()) {
227                                    return false;
228                            }
229    
230                            int dataSize = BigEndianCodec.getInt(
231                                    _headerBufferArray, _INDEX_DATA_SIZE);
232    
233                            if (dataSize == 0) {
234                                    _dataByteBuffer = _EMPTY_BUFFER;
235    
236                                    return true;
237                            }
238    
239                            _dataByteBuffer = ByteBuffer.allocate(dataSize);
240                    }
241    
242                    if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
243                            throw new EOFException();
244                    }
245    
246                    if (_dataByteBuffer.hasRemaining()) {
247                            return false;
248                    }
249    
250                    _dataByteBuffer.flip();
251    
252                    return true;
253            }
254    
255            protected void setAckRequest(boolean ackRequest) {
256                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
257    
258                    if (ackRequest) {
259                            statusFlag |= _FLAG_ACK_REQUEST;
260                    }
261                    else {
262                            statusFlag &= ~_FLAG_ACK_REQUEST;
263                    }
264    
265                    _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
266            }
267    
268            protected void setSequenceId(long sequenceId) {
269                    BigEndianCodec.putLong(
270                            _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
271            }
272    
273            protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
274                    throws IOException {
275    
276                    if (_headerByteBuffer.hasRemaining()) {
277                            ByteBuffer[] byteBuffers = new ByteBuffer[2];
278    
279                            byteBuffers[0] = _headerByteBuffer;
280                            byteBuffers[1] = _dataByteBuffer;
281    
282                            gatheringByteChannel.write(byteBuffers);
283                    }
284                    else {
285                            gatheringByteChannel.write(_dataByteBuffer);
286                    }
287    
288                    if (_dataByteBuffer.hasRemaining()) {
289                            return false;
290                    }
291    
292                    _dataByteBuffer = null;
293    
294                    return true;
295            }
296    
297            protected Object attachment;
298            protected CompletionHandler<Object> completionHandler;
299            protected EnumSet<CompletionType> completionTypes;
300            protected long expireTime;
301            protected long timeout;
302    
303            private Datagram() {
304                    _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
305    
306                    // Directly reference the interanl byte array for faster encoding and
307                    // decoding
308    
309                    _headerBufferArray = _headerByteBuffer.array();
310            }
311    
312            private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
313    
314            private static final byte _FLAG_ACK_REQUEST = 1;
315    
316            private static final byte _FLAG_ACK_RESPONSE = 2;
317    
318            private static final byte _FLAG_REQUEST = 4;
319    
320            private static final byte _FLAG_RESPONSE = 8;
321    
322            private static final int _HEADER_SIZE = 14;
323    
324            private static final int _INDEX_DATA_SIZE = 10;
325    
326            private static final int _INDEX_DATA_TYPE = 9;
327    
328            private static final int _INDEX_SEQUENCE_ID = 1;
329    
330            private static final int _INDEX_STATUS_FLAG = 0;
331    
332            private ByteBuffer _dataByteBuffer;
333            private final byte[] _headerBufferArray;
334            private final ByteBuffer _headerByteBuffer;
335    
336    }