001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io.unsync;
016    
017    import java.io.IOException;
018    import java.io.InputStream;
019    
020    /**
021     * <p>
022     * See http://issues.liferay.com/browse/LPS-6648.
023     * </p>
024     *
025     * @author Shuyang Zhou
026     */
027    public class UnsyncBufferedInputStream extends UnsyncFilterInputStream {
028    
029            public UnsyncBufferedInputStream(InputStream inputStream) {
030                    this(inputStream, _DEFAULT_BUFFER_SIZE);
031            }
032    
033            public UnsyncBufferedInputStream(InputStream inputStream, int size) {
034                    super(inputStream);
035    
036                    if (size <= 0) {
037                            throw new IllegalArgumentException("Size is less than 0");
038                    }
039    
040                    buffer = new byte[size];
041            }
042    
043            @Override
044            public int available() throws IOException {
045                    if (inputStream == null) {
046                            throw new IOException("Input stream is null");
047                    }
048    
049                    return inputStream.available() + (firstInvalidIndex - index);
050            }
051    
052            @Override
053            public void close() throws IOException {
054                    if (inputStream != null) {
055                            inputStream.close();
056    
057                            inputStream = null;
058                            buffer = null;
059                    }
060            }
061    
062            @Override
063            public void mark(int readLimit) {
064                    if (readLimit <= 0) {
065                            return;
066                    }
067    
068                    markLimitIndex = readLimit;
069    
070                    if (index > 0) {
071    
072                            int available = firstInvalidIndex - index;
073    
074                            if (available > 0) {
075    
076                                    // Shuffle mark beginning to buffer beginning
077    
078                                    System.arraycopy(buffer, index, buffer, 0, available);
079    
080                                    index = 0;
081    
082                                    firstInvalidIndex = available;
083                            }
084                            else {
085    
086                                    // Reset buffer states
087    
088                                    index = firstInvalidIndex = 0;
089                            }
090                    }
091            }
092    
093            @Override
094            public boolean markSupported() {
095                    return true;
096            }
097    
098            @Override
099            public int read() throws IOException {
100                    if (inputStream == null) {
101                            throw new IOException("Input stream is null");
102                    }
103    
104                    if (index >= firstInvalidIndex) {
105                            fillInBuffer();
106    
107                            if (index >= firstInvalidIndex) {
108                                    return -1;
109                            }
110                    }
111    
112                    return buffer[index++] & 0xff;
113            }
114    
115            @Override
116            public int read(byte[] bytes) throws IOException {
117                    return read(bytes, 0, bytes.length);
118            }
119    
120            @Override
121            public int read(byte[] bytes, int offset, int length) throws IOException {
122                    if (inputStream == null) {
123                            throw new IOException("Input stream is null");
124                    }
125    
126                    if (length <= 0) {
127                            return 0;
128                    }
129    
130                    int read = 0;
131    
132                    while (true) {
133    
134                            // Try to at least read some data
135    
136                            int currentRead = readOnce(bytes, offset + read, length - read);
137    
138                            if (currentRead <= 0) {
139                                    if (read == 0) {
140                                            read = currentRead;
141                                    }
142    
143                                    break;
144                            }
145    
146                            read += currentRead;
147    
148                            if ((read >= length) || (inputStream.available() <= 0)) {
149    
150                                    // Read enough or further reading may be blocked, stop reading
151    
152                                    break;
153                            }
154                    }
155    
156                    return read;
157            }
158    
159            @Override
160            public void reset() throws IOException {
161                    if (inputStream == null) {
162                            throw new IOException("Input stream is null");
163                    }
164    
165                    if (markLimitIndex < 0) {
166                            throw new IOException("Resetting to invalid mark");
167                    }
168    
169                    index = 0;
170            }
171    
172            @Override
173            public long skip(long skip) throws IOException {
174                    if (inputStream == null) {
175                            throw new IOException("Input stream is null");
176                    }
177    
178                    if (skip <= 0) {
179                            return 0;
180                    }
181    
182                    long available = firstInvalidIndex - index;
183    
184                    if (available <= 0) {
185                            if (markLimitIndex < 0) {
186    
187                                    // No mark required, skip the underlying input stream
188    
189                                    return inputStream.skip(skip);
190                            }
191                            else {
192    
193                                    // Mark required, save the skipped data
194    
195                                    fillInBuffer();
196    
197                                    available = firstInvalidIndex - index;
198    
199                                    if (available <= 0) {
200                                            return 0;
201                                    }
202                            }
203                    }
204    
205                    // Skip the data in buffer
206    
207                    if (available < skip) {
208                            skip = available;
209                    }
210    
211                    index += skip;
212    
213                    return skip;
214            }
215    
216            protected void fillInBuffer() throws IOException {
217                    if (markLimitIndex < 0) {
218    
219                            // No mark required, fill the buffer
220    
221                            index = firstInvalidIndex = 0;
222    
223                            int number = inputStream.read(buffer);
224    
225                            if (number > 0) {
226                                    firstInvalidIndex = number;
227                            }
228    
229                            return;
230                    }
231    
232                    // Mark required
233    
234                    if (index >= markLimitIndex) {
235    
236                            // Passed mark limit indexs, get rid of all cache data
237    
238                            markLimitIndex = -1;
239    
240                            index = firstInvalidIndex = 0;
241                    }
242                    else if (index == buffer.length) {
243    
244                            // Cannot get rid of cache data and there is no room to read in any
245                            // more data, so grow the buffer
246    
247                            int newBufferSize = buffer.length * 2;
248    
249                            if (newBufferSize > markLimitIndex) {
250                                    newBufferSize = markLimitIndex;
251                            }
252    
253                            byte[] newBuffer = new byte[newBufferSize];
254    
255                            System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
256    
257                            buffer = newBuffer;
258                    }
259    
260                    // Read underlying input stream since the buffer has more space
261    
262                    firstInvalidIndex = index;
263    
264                    int number = inputStream.read(buffer, index, buffer.length - index);
265    
266                    if (number > 0) {
267                            firstInvalidIndex += number;
268                    }
269            }
270    
271            protected int readOnce(byte[] bytes, int offset, int length)
272                    throws IOException {
273    
274                    int available = firstInvalidIndex - index;
275    
276                    if (available <= 0) {
277    
278                            // Buffer is empty, read from under input stream
279    
280                            if ((markLimitIndex < 0) && (length >= buffer.length)) {
281    
282                                    // No mark required, left read block is no less than buffer,
283                                    // read through buffer is inefficient, so directly read from
284                                    // underlying input stream
285    
286                                    return inputStream.read(bytes, offset, length);
287                            }
288                            else {
289    
290                                    // Mark is required, has to read through the buffer to remember
291                                    // data
292    
293                                    fillInBuffer();
294    
295                                    available = firstInvalidIndex - index;
296    
297                                    if (available <= 0) {
298                                            return -1;
299                                    }
300                            }
301                    }
302    
303                    if (length > available) {
304                            length = available;
305                    }
306    
307                    System.arraycopy(buffer, index, bytes, offset, length);
308    
309                    index += length;
310    
311                    return length;
312            }
313    
314            protected byte[] buffer;
315            protected int firstInvalidIndex;
316            protected int index;
317            protected int markLimitIndex = -1;
318    
319            private static final int _DEFAULT_BUFFER_SIZE = 8192;
320    
321    }