001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io;
016    
017    import com.liferay.portal.kernel.util.StringPool;
018    
019    import java.io.IOException;
020    import java.io.Reader;
021    import java.io.Writer;
022    
023    import java.nio.CharBuffer;
024    
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.Lock;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * @author Shuyang Zhou
031     */
032    public class CharPipe {
033    
034            public CharPipe() {
035                    this(_DEFAULT_BUFFER_SIZE);
036            }
037    
038            public CharPipe(int bufferSize) {
039                    buffer = new char[bufferSize];
040                    count = 0;
041                    readIndex = 0;
042                    writeIndex = 0;
043            }
044    
045            public void close() {
046                    close(false);
047            }
048    
049            public void close(boolean force) {
050                    _pipeWriter.close();
051    
052                    if (force) {
053                            _pipeReader.close();
054                            buffer = null;
055                    }
056                    else {
057                            bufferLock.lock();
058    
059                            finished = true;
060    
061                            try {
062                                    notEmpty.signalAll();
063                            }
064                            finally {
065                                    bufferLock.unlock();
066                            }
067                    }
068            }
069    
070            public Reader getReader() {
071                    return _pipeReader;
072            }
073    
074            public Writer getWriter() {
075                    return _pipeWriter;
076            }
077    
078            protected char[] buffer;
079            protected Lock bufferLock = new ReentrantLock();
080            protected int count;
081            protected boolean finished;
082            protected Condition notEmpty = bufferLock.newCondition();
083            protected Condition notFull = bufferLock.newCondition();
084            protected int readIndex;
085            protected int writeIndex;
086    
087            private class PipeReader extends Reader {
088    
089                    @Override
090                    public void close() {
091                            bufferLock.lock();
092    
093                            try {
094                                    _closed = true;
095    
096                                    notEmpty.signalAll();
097                            }
098                            finally {
099                                    bufferLock.unlock();
100                            }
101                    }
102    
103                    @Override
104                    public void mark(int readAheadLimit) throws IOException {
105                            throw new IOException("Mark is not supported");
106                    }
107    
108                    @Override
109                    public boolean markSupported() {
110                            return false;
111                    }
112    
113                    @Override
114                    public int read() throws IOException {
115                            if (_closed) {
116                                    throw new IOException("Stream closed");
117                            }
118    
119                            bufferLock.lock();
120    
121                            try {
122                                    if (waitUntilNotEmpty()) {
123                                            return -1;
124                                    }
125    
126                                    char result = buffer[readIndex];
127    
128                                    increaseReadIndex(1);
129    
130                                    return result;
131                            }
132                            finally {
133                                    bufferLock.unlock();
134                            }
135                    }
136    
137                    @Override
138                    public int read(char[] chars) throws IOException {
139                            return read(chars, 0, chars.length);
140                    }
141    
142                    @Override
143                    public int read(char[] chars, int offset, int length)
144                            throws IOException {
145    
146                            if (_closed) {
147                                    throw new IOException("Stream closed");
148                            }
149    
150                            if (length <= 0) {
151                                    return 0;
152                            }
153    
154                            bufferLock.lock();
155    
156                            try {
157                                    if (waitUntilNotEmpty()) {
158                                            return -1;
159                                    }
160    
161                                    int read = length;
162    
163                                    if (length > count) {
164                                            read = count;
165                                    }
166    
167                                    if ((buffer.length - readIndex) >= read) {
168    
169                                            // One step read
170    
171                                            System.arraycopy(buffer, readIndex, chars, offset, read);
172                                    }
173                                    else {
174    
175                                            // Two step read
176    
177                                            int tailLength = buffer.length - readIndex;
178                                            int headLength = read - tailLength;
179    
180                                            System.arraycopy(
181                                                    buffer, readIndex, chars, offset, tailLength);
182                                            System.arraycopy(
183                                                    buffer, 0, chars, offset + tailLength, headLength);
184                                    }
185    
186                                    increaseReadIndex(read);
187    
188                                    return read;
189                            }
190                            finally {
191                                    bufferLock.unlock();
192                            }
193                    }
194    
195                    @Override
196                    public int read(CharBuffer charBuffer) throws IOException {
197                            if (_closed) {
198                                    throw new IOException("Stream closed");
199                            }
200    
201                            int length = charBuffer.remaining();
202    
203                            if (length <= 0) {
204                                    return 0;
205                            }
206    
207                            char[] tempBuffer = new char[length];
208    
209                            int read = read(tempBuffer, 0, length);
210    
211                            if (read > 0) {
212                                    charBuffer.put(tempBuffer, 0, read);
213                            }
214    
215                            return read;
216                    }
217    
218                    @Override
219                    public boolean ready() throws IOException {
220                            if (_closed) {
221                                    throw new IOException("Stream closed");
222                            }
223    
224                            bufferLock.lock();
225    
226                            try {
227                                    return count > 0;
228                            }
229                            finally {
230                                    bufferLock.unlock();
231                            }
232                    }
233    
234                    @Override
235                    public void reset() throws IOException {
236                            throw new IOException("Reset is not supported");
237                    }
238    
239                    @Override
240                    public long skip(long skip) throws IOException {
241                            if (skip < 0) {
242                                    throw new IllegalArgumentException("Skip value is negative");
243                            }
244    
245                            if (_closed) {
246                                    throw new IOException("Stream closed");
247                            }
248    
249                            int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
250    
251                            bufferLock.lock();
252    
253                            try {
254                                    if ((_skipBuffer == null) ||
255                                            (_skipBuffer.length < skipBufferSize)) {
256    
257                                            _skipBuffer = new char[skipBufferSize];
258                                    }
259    
260                                    long remaining = skip;
261    
262                                    while (remaining > 0) {
263                                            int skipped = read(
264                                                    _skipBuffer, 0,
265                                                    (int)Math.min(remaining, skipBufferSize));
266    
267                                            if (skipped == -1) {
268                                                    break;
269                                            }
270    
271                                            remaining -= skipped;
272                                    }
273    
274                                    return skip - remaining;
275                            }
276                            finally {
277                                    bufferLock.unlock();
278                            }
279                    }
280    
281                    protected boolean waitUntilNotEmpty() throws IOException {
282                            while ((count == 0) && !finished) {
283                                    notEmpty.awaitUninterruptibly();
284    
285                                    if (_closed) {
286                                            throw new IOException("Stream closed");
287                                    }
288                            }
289    
290                            if ((count == 0) && finished) {
291                                    return true;
292                            }
293                            else {
294                                    return false;
295                            }
296                    }
297    
298                    private void increaseReadIndex(int consumed) {
299                            readIndex += consumed;
300    
301                            if (readIndex >= buffer.length) {
302                                    readIndex -= buffer.length;
303                            }
304    
305                            if (count == buffer.length) {
306                                    notFull.signalAll();
307                            }
308    
309                            count -= consumed;
310                    }
311    
312                    private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
313    
314                    private volatile boolean _closed;
315                    private char[] _skipBuffer;
316    
317            }
318    
319            private class PipeWriter extends Writer {
320    
321                    @Override
322                    public Writer append(char c) throws IOException {
323                            write(c);
324    
325                            return this;
326                    }
327    
328                    @Override
329                    public Writer append(CharSequence charSequence) throws IOException {
330                            String string = null;
331    
332                            if (charSequence == null) {
333                                    string = StringPool.NULL;
334                            }
335                            else {
336                                    string = charSequence.toString();
337                            }
338    
339                            write(string, 0, string.length());
340    
341                            return this;
342                    }
343    
344                    @Override
345                    public Writer append(CharSequence charSequence, int start, int end)
346                            throws IOException {
347    
348                            String string = null;
349    
350                            if (charSequence == null) {
351                                    string = StringPool.NULL;
352                            }
353                            else {
354                                    string = charSequence.subSequence(start, end).toString();
355                            }
356    
357                            write(string, 0, string.length());
358    
359                            return this;
360                    }
361    
362                    @Override
363                    public void close() {
364                            bufferLock.lock();
365    
366                            try {
367                                    _closed = true;
368    
369                                    notFull.signalAll();
370                            }
371                            finally {
372                                    bufferLock.unlock();
373                            }
374                    }
375    
376                    @Override
377                    public void flush() {
378                    }
379    
380                    @Override
381                    public void write(char[] chars) throws IOException {
382                            write(chars, 0, chars.length);
383                    }
384    
385                    @Override
386                    public void write(char[] chars, int offset, int length)
387                            throws IOException {
388    
389                            if (_closed) {
390                                    throw new IOException("Stream closed");
391                            }
392    
393                            if (length <= 0) {
394                                    return;
395                            }
396    
397                            bufferLock.lock();
398    
399                            try {
400                                    int remaining = length;
401    
402                                    while (remaining > 0) {
403                                            waitUntilNotFull();
404    
405                                            int write = remaining;
406    
407                                            if (remaining > (buffer.length - count)) {
408                                                    write = buffer.length - count;
409                                            }
410    
411                                            int sourceBegin = offset + length - remaining;
412    
413                                            if ((buffer.length - writeIndex) >= write) {
414    
415                                                    // One step write
416    
417                                                    System.arraycopy(
418                                                            chars, sourceBegin, buffer, writeIndex, write);
419                                            }
420                                            else {
421    
422                                                    // Two step write
423    
424                                                    int tailLength = buffer.length - writeIndex;
425                                                    int headLength = write - tailLength;
426    
427                                                    System.arraycopy(
428                                                            chars, sourceBegin, buffer, writeIndex, tailLength);
429                                                    System.arraycopy(
430                                                            chars, sourceBegin + tailLength, buffer, 0,
431                                                            headLength);
432                                            }
433    
434                                            increaseWriteIndex(write);
435    
436                                            remaining -= write;
437                                    }
438                            }
439                            finally {
440                                    bufferLock.unlock();
441                            }
442                    }
443    
444                    @Override
445                    public void write(int c) throws IOException {
446                            if (_closed) {
447                                    throw new IOException("Stream closed");
448                            }
449    
450                            bufferLock.lock();
451    
452                            try {
453                                    waitUntilNotFull();
454    
455                                    buffer[writeIndex] = (char)c;
456    
457                                    increaseWriteIndex(1);
458                            }
459                            finally {
460                                    bufferLock.unlock();
461                            }
462                    }
463    
464                    @Override
465                    public void write(String string) throws IOException {
466                            write(string, 0, string.length());
467                    }
468    
469                    @Override
470                    public void write(String string, int offset, int length)
471                            throws IOException {
472    
473                            if (_closed) {
474                                    throw new IOException("Stream closed");
475                            }
476    
477                            if (length <= 0) {
478                                    return;
479                            }
480    
481                            bufferLock.lock();
482    
483                            try {
484                                    int remaining = length;
485    
486                                    while (remaining > 0) {
487                                            waitUntilNotFull();
488    
489                                            int write = remaining;
490    
491                                            if (remaining > buffer.length - count) {
492                                                    write = buffer.length - count;
493                                            }
494    
495                                            int sourceBegin = offset + length - remaining;
496    
497                                            if ((buffer.length - writeIndex) >= write) {
498    
499                                                    // One step write
500    
501                                                    string.getChars(
502                                                            sourceBegin, sourceBegin + write, buffer,
503                                                            writeIndex);
504                                            }
505                                            else {
506    
507                                                    // Two step write
508    
509                                                    int tailLength = buffer.length - writeIndex;
510                                                    int headLength = write - tailLength;
511    
512                                                    string.getChars(
513                                                            sourceBegin, sourceBegin + tailLength, buffer,
514                                                            writeIndex);
515                                                    string.getChars(
516                                                            sourceBegin + tailLength,
517                                                            sourceBegin + tailLength + headLength, buffer, 0);
518                                            }
519    
520                                            increaseWriteIndex(write);
521    
522                                            remaining -= write;
523                                    }
524                            }
525                            finally {
526                                    bufferLock.unlock();
527                            }
528                    }
529    
530                    protected void waitUntilNotFull() throws IOException {
531                            while (count == buffer.length) {
532                                    notFull.awaitUninterruptibly();
533    
534                                    if (_closed) {
535                                            throw new IOException("Stream closed");
536                                    }
537                            }
538                    }
539    
540                    private void increaseWriteIndex(int produced) {
541                            writeIndex += produced;
542    
543                            if (writeIndex >= buffer.length) {
544                                    writeIndex -= buffer.length;
545                            }
546    
547                            if (count == 0) {
548                                    notEmpty.signalAll();
549                            }
550    
551                            count += produced;
552                    }
553    
554                    private volatile boolean _closed;
555    
556            }
557    
558            private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
559    
560            private PipeReader _pipeReader = new PipeReader();
561            private PipeWriter _pipeWriter = new PipeWriter();
562    
563    }