001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.io.BigEndianCodec;
018    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019    import com.liferay.portal.kernel.util.StringBundler;
020    import com.liferay.portal.kernel.util.StringPool;
021    
022    import java.io.EOFException;
023    import java.io.IOException;
024    
025    import java.nio.ByteBuffer;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    
029    import java.util.EnumSet;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class Datagram {
035    
036            public static Datagram createRequestDatagram(byte type, byte[] data) {
037                    return createRequestDatagram(type, ByteBuffer.wrap(data));
038            }
039    
040            public static Datagram createRequestDatagram(
041                    byte type, ByteBuffer dataByteBuffer) {
042    
043                    Datagram datagram = new Datagram();
044    
045                    // Status flag
046    
047                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
048    
049                    // Request datagram does not set the sequence ID
050    
051                    // Data type
052    
053                    datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
054    
055                    // Data size
056    
057                    BigEndianCodec.putInt(
058                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
059                            dataByteBuffer.remaining());
060    
061                    // Data chunk
062    
063                    datagram._dataByteBuffer = dataByteBuffer;
064    
065                    return datagram;
066            }
067    
068            public static Datagram createResponseDatagram(
069                    Datagram requestDatagram, byte[] data) {
070    
071                    return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
072            }
073    
074            public static Datagram createResponseDatagram(
075                    Datagram requestDatagram, ByteBuffer byteBuffer) {
076    
077                    Datagram datagram = new Datagram();
078    
079                    // Status flag
080    
081                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
082    
083                    // Sequence ID
084    
085                    BigEndianCodec.putLong(
086                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
087                            requestDatagram.getSequenceId());
088    
089                    // Response datagram does not set the data type
090    
091                    // Data size
092    
093                    BigEndianCodec.putInt(
094                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
095                            byteBuffer.remaining());
096    
097                    // Data chunk
098    
099                    datagram._dataByteBuffer = byteBuffer;
100    
101                    return datagram;
102            }
103    
104            public ByteBuffer getDataByteBuffer() {
105                    return _dataByteBuffer;
106            }
107    
108            public byte getType() {
109                    return _headerBufferArray[_INDEX_DATA_TYPE];
110            }
111    
112            @Override
113            public String toString() {
114                    StringBundler sb = new StringBundler(11);
115    
116                    sb.append("{dataChunk=");
117    
118                    ByteBuffer byteBuffer = _dataByteBuffer;
119    
120                    if (byteBuffer == null) {
121                            sb.append(StringPool.NULL);
122                    }
123                    else {
124                            sb.append(byteBuffer.toString());
125                    }
126    
127                    sb.append(", dataSize=");
128                    sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
129                    sb.append(", dataType=");
130                    sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
131                    sb.append(", sequenceId=");
132                    sb.append(
133                            BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
134                    sb.append(", statusFlag=");
135                    sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
136                    sb.append("}");
137    
138                    return sb.toString();
139            }
140    
141            protected static Datagram createACKResponseDatagram(long sequenceId) {
142                    Datagram datagram = new Datagram();
143    
144                    // Status flag
145    
146                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
147    
148                    // Sequence ID
149    
150                    BigEndianCodec.putLong(
151                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
152    
153                    // ACK response datagram does not set the data type
154    
155                    // Data size
156    
157                    BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
158    
159                    // Data chunk
160    
161                    datagram._dataByteBuffer = _EMPTY_BUFFER;
162    
163                    return datagram;
164            }
165    
166            protected static Datagram createReceiveDatagram() {
167                    return new Datagram();
168            }
169    
170            protected long getSequenceId() {
171                    return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
172            }
173    
174            protected boolean isAckRequest() {
175                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
176    
177                    if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
178                            return true;
179                    }
180                    else {
181                            return false;
182                    }
183            }
184    
185            protected boolean isAckResponse() {
186                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
187    
188                    if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
189                            return true;
190                    }
191                    else {
192                            return false;
193                    }
194            }
195    
196            protected boolean isRequest() {
197                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
198    
199                    if ((statusFlag & _FLAG_REQUEST) != 0) {
200                            return true;
201                    }
202                    else {
203                            return false;
204                    }
205            }
206    
207            protected boolean isResponse() {
208                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
209    
210                    if ((statusFlag & _FLAG_RESPONSE) != 0) {
211                            return true;
212                    }
213                    else {
214                            return false;
215                    }
216            }
217    
218            protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
219                    throws IOException {
220    
221                    if (_headerByteBuffer.hasRemaining()) {
222                            if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
223                                    throw new EOFException();
224                            }
225    
226                            if (_headerByteBuffer.hasRemaining()) {
227                                    return false;
228                            }
229                            else {
230                                    int dataSize = BigEndianCodec.getInt(
231                                            _headerBufferArray, _INDEX_DATA_SIZE);
232    
233                                    if (dataSize == 0) {
234                                            _dataByteBuffer = _EMPTY_BUFFER;
235    
236                                            return true;
237                                    }
238                                    else {
239                                            _dataByteBuffer = ByteBuffer.allocate(dataSize);
240                                    }
241                            }
242                    }
243    
244                    if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
245                            throw new EOFException();
246                    }
247    
248                    if (_dataByteBuffer.hasRemaining()) {
249                            return false;
250                    }
251                    else {
252                            _dataByteBuffer.flip();
253    
254                            return true;
255                    }
256            }
257    
258            protected void setAckRequest(boolean ackRequest) {
259                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
260    
261                    if (ackRequest) {
262                            statusFlag |= _FLAG_ACK_REQUEST;
263                    }
264                    else {
265                            statusFlag &= ~_FLAG_ACK_REQUEST;
266                    }
267    
268                    _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
269            }
270    
271            protected void setSequenceId(long sequenceId) {
272                    BigEndianCodec.putLong(
273                            _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
274            }
275    
276            protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
277                    throws IOException {
278    
279                    if (_headerByteBuffer.hasRemaining()) {
280                            ByteBuffer[] byteBuffers = new ByteBuffer[2];
281    
282                            byteBuffers[0] = _headerByteBuffer;
283                            byteBuffers[1] = _dataByteBuffer;
284    
285                            gatheringByteChannel.write(byteBuffers);
286                    }
287                    else {
288                            gatheringByteChannel.write(_dataByteBuffer);
289                    }
290    
291                    if (_dataByteBuffer.hasRemaining()) {
292                            return false;
293                    }
294                    else {
295                            _dataByteBuffer = null;
296    
297                            return true;
298                    }
299            }
300    
301            protected Object attachment;
302            protected CompletionHandler<Object> completionHandler;
303            protected EnumSet<CompletionType> completionTypes;
304            protected long expireTime;
305            protected long timeout;
306    
307            private Datagram() {
308                    _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
309    
310                    // Directly reference the interanl byte array for faster encoding and
311                    // decoding
312    
313                    _headerBufferArray = _headerByteBuffer.array();
314            }
315    
316            private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
317    
318            private static final byte _FLAG_ACK_REQUEST = 1;
319    
320            private static final byte _FLAG_ACK_RESPONSE = 2;
321    
322            private static final byte _FLAG_REQUEST = 4;
323    
324            private static final byte _FLAG_RESPONSE = 8;
325    
326            private static final int _HEADER_SIZE = 14;
327    
328            private static final int _INDEX_DATA_SIZE = 10;
329    
330            private static final int _INDEX_DATA_TYPE = 9;
331    
332            private static final int _INDEX_SEQUENCE_ID = 1;
333    
334            private static final int _INDEX_STATUS_FLAG = 0;
335    
336            private ByteBuffer _dataByteBuffer;
337            private final byte[] _headerBufferArray;
338            private final ByteBuffer _headerByteBuffer;
339    
340    }