001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io;
016    
017    import com.liferay.portal.kernel.util.StringPool;
018    
019    import java.io.IOException;
020    import java.io.Reader;
021    import java.io.Writer;
022    
023    import java.nio.CharBuffer;
024    
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.Lock;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * @author Shuyang Zhou
031     */
032    public class CharPipe {
033    
034            public CharPipe() {
035                    this(_DEFAULT_BUFFER_SIZE);
036            }
037    
038            public CharPipe(int bufferSize) {
039                    buffer = new char[bufferSize];
040                    count = 0;
041                    readIndex = 0;
042                    writeIndex = 0;
043            }
044    
045            public void close() {
046                    close(false);
047            }
048    
049            public void close(boolean force) {
050                    _pipeWriter.close();
051    
052                    if (force) {
053                            _pipeReader.close();
054                            buffer = null;
055                    }
056                    else {
057                            bufferLock.lock();
058    
059                            finished = true;
060    
061                            try {
062                                    notEmpty.signalAll();
063                            }
064                            finally {
065                                    bufferLock.unlock();
066                            }
067                    }
068            }
069    
070            public Reader getReader() {
071                    return _pipeReader;
072            }
073    
074            public Writer getWriter() {
075                    return _pipeWriter;
076            }
077    
078            protected char[] buffer;
079            protected Lock bufferLock = new ReentrantLock();
080            protected int count;
081            protected boolean finished;
082            protected Condition notEmpty = bufferLock.newCondition();
083            protected Condition notFull = bufferLock.newCondition();
084            protected int readIndex;
085            protected int writeIndex;
086    
087            private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
088    
089            private final PipeReader _pipeReader = new PipeReader();
090            private final PipeWriter _pipeWriter = new PipeWriter();
091    
092            private class PipeReader extends Reader {
093    
094                    @Override
095                    public void close() {
096                            bufferLock.lock();
097    
098                            try {
099                                    _closed = true;
100    
101                                    notEmpty.signalAll();
102                            }
103                            finally {
104                                    bufferLock.unlock();
105                            }
106                    }
107    
108                    @Override
109                    public void mark(int readAheadLimit) throws IOException {
110                            throw new IOException("Mark is not supported");
111                    }
112    
113                    @Override
114                    public boolean markSupported() {
115                            return false;
116                    }
117    
118                    @Override
119                    public int read() throws IOException {
120                            if (_closed) {
121                                    throw new IOException("Stream closed");
122                            }
123    
124                            bufferLock.lock();
125    
126                            try {
127                                    if (waitUntilNotEmpty()) {
128                                            return -1;
129                                    }
130    
131                                    char result = buffer[readIndex];
132    
133                                    _increaseReadIndex(1);
134    
135                                    return result;
136                            }
137                            finally {
138                                    bufferLock.unlock();
139                            }
140                    }
141    
142                    @Override
143                    public int read(char[] chars) throws IOException {
144                            return read(chars, 0, chars.length);
145                    }
146    
147                    @Override
148                    public int read(char[] chars, int offset, int length)
149                            throws IOException {
150    
151                            if (_closed) {
152                                    throw new IOException("Stream closed");
153                            }
154    
155                            if (length <= 0) {
156                                    return 0;
157                            }
158    
159                            bufferLock.lock();
160    
161                            try {
162                                    if (waitUntilNotEmpty()) {
163                                            return -1;
164                                    }
165    
166                                    int read = length;
167    
168                                    if (length > count) {
169                                            read = count;
170                                    }
171    
172                                    if ((buffer.length - readIndex) >= read) {
173    
174                                            // One step read
175    
176                                            System.arraycopy(buffer, readIndex, chars, offset, read);
177                                    }
178                                    else {
179    
180                                            // Two step read
181    
182                                            int tailLength = buffer.length - readIndex;
183                                            int headLength = read - tailLength;
184    
185                                            System.arraycopy(
186                                                    buffer, readIndex, chars, offset, tailLength);
187                                            System.arraycopy(
188                                                    buffer, 0, chars, offset + tailLength, headLength);
189                                    }
190    
191                                    _increaseReadIndex(read);
192    
193                                    return read;
194                            }
195                            finally {
196                                    bufferLock.unlock();
197                            }
198                    }
199    
200                    @Override
201                    public int read(CharBuffer charBuffer) throws IOException {
202                            if (_closed) {
203                                    throw new IOException("Stream closed");
204                            }
205    
206                            int length = charBuffer.remaining();
207    
208                            if (length <= 0) {
209                                    return 0;
210                            }
211    
212                            char[] tempBuffer = new char[length];
213    
214                            int read = read(tempBuffer, 0, length);
215    
216                            if (read > 0) {
217                                    charBuffer.put(tempBuffer, 0, read);
218                            }
219    
220                            return read;
221                    }
222    
223                    @Override
224                    public boolean ready() throws IOException {
225                            if (_closed) {
226                                    throw new IOException("Stream closed");
227                            }
228    
229                            bufferLock.lock();
230    
231                            try {
232                                    if (count > 0) {
233                                            return true;
234                                    }
235    
236                                    return false;
237                            }
238                            finally {
239                                    bufferLock.unlock();
240                            }
241                    }
242    
243                    @Override
244                    public void reset() throws IOException {
245                            throw new IOException("Reset is not supported");
246                    }
247    
248                    @Override
249                    public long skip(long skip) throws IOException {
250                            if (skip < 0) {
251                                    throw new IllegalArgumentException("Skip value is negative");
252                            }
253    
254                            if (_closed) {
255                                    throw new IOException("Stream closed");
256                            }
257    
258                            int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
259    
260                            bufferLock.lock();
261    
262                            try {
263                                    if ((_skipBuffer == null) ||
264                                            (_skipBuffer.length < skipBufferSize)) {
265    
266                                            _skipBuffer = new char[skipBufferSize];
267                                    }
268    
269                                    long remaining = skip;
270    
271                                    while (remaining > 0) {
272                                            int skipped = read(
273                                                    _skipBuffer, 0,
274                                                    (int)Math.min(remaining, skipBufferSize));
275    
276                                            if (skipped == -1) {
277                                                    break;
278                                            }
279    
280                                            remaining -= skipped;
281                                    }
282    
283                                    return skip - remaining;
284                            }
285                            finally {
286                                    bufferLock.unlock();
287                            }
288                    }
289    
290                    protected boolean waitUntilNotEmpty() throws IOException {
291                            while ((count == 0) && !finished) {
292                                    notEmpty.awaitUninterruptibly();
293    
294                                    if (_closed) {
295                                            throw new IOException("Stream closed");
296                                    }
297                            }
298    
299                            if ((count == 0) && finished) {
300                                    return true;
301                            }
302                            else {
303                                    return false;
304                            }
305                    }
306    
307                    private void _increaseReadIndex(int consumed) {
308                            readIndex += consumed;
309    
310                            if (readIndex >= buffer.length) {
311                                    readIndex -= buffer.length;
312                            }
313    
314                            if (count == buffer.length) {
315                                    notFull.signalAll();
316                            }
317    
318                            count -= consumed;
319                    }
320    
321                    private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
322    
323                    private volatile boolean _closed;
324                    private char[] _skipBuffer;
325    
326            }
327    
328            private class PipeWriter extends Writer {
329    
330                    @Override
331                    public Writer append(char c) throws IOException {
332                            write(c);
333    
334                            return this;
335                    }
336    
337                    @Override
338                    public Writer append(CharSequence charSequence) throws IOException {
339                            String string = null;
340    
341                            if (charSequence == null) {
342                                    string = StringPool.NULL;
343                            }
344                            else {
345                                    string = charSequence.toString();
346                            }
347    
348                            write(string, 0, string.length());
349    
350                            return this;
351                    }
352    
353                    @Override
354                    public Writer append(CharSequence charSequence, int start, int end)
355                            throws IOException {
356    
357                            String string = null;
358    
359                            if (charSequence == null) {
360                                    string = StringPool.NULL;
361                            }
362                            else {
363                                    string = charSequence.subSequence(start, end).toString();
364                            }
365    
366                            write(string, 0, string.length());
367    
368                            return this;
369                    }
370    
371                    @Override
372                    public void close() {
373                            bufferLock.lock();
374    
375                            try {
376                                    _closed = true;
377    
378                                    notFull.signalAll();
379                            }
380                            finally {
381                                    bufferLock.unlock();
382                            }
383                    }
384    
385                    @Override
386                    public void flush() {
387                    }
388    
389                    @Override
390                    public void write(char[] chars) throws IOException {
391                            write(chars, 0, chars.length);
392                    }
393    
394                    @Override
395                    public void write(char[] chars, int offset, int length)
396                            throws IOException {
397    
398                            if (_closed) {
399                                    throw new IOException("Stream closed");
400                            }
401    
402                            if (length <= 0) {
403                                    return;
404                            }
405    
406                            bufferLock.lock();
407    
408                            try {
409                                    int remaining = length;
410    
411                                    while (remaining > 0) {
412                                            waitUntilNotFull();
413    
414                                            int write = remaining;
415    
416                                            if (remaining > (buffer.length - count)) {
417                                                    write = buffer.length - count;
418                                            }
419    
420                                            int sourceBegin = offset + length - remaining;
421    
422                                            if ((buffer.length - writeIndex) >= write) {
423    
424                                                    // One step write
425    
426                                                    System.arraycopy(
427                                                            chars, sourceBegin, buffer, writeIndex, write);
428                                            }
429                                            else {
430    
431                                                    // Two step write
432    
433                                                    int tailLength = buffer.length - writeIndex;
434                                                    int headLength = write - tailLength;
435    
436                                                    System.arraycopy(
437                                                            chars, sourceBegin, buffer, writeIndex, tailLength);
438                                                    System.arraycopy(
439                                                            chars, sourceBegin + tailLength, buffer, 0,
440                                                            headLength);
441                                            }
442    
443                                            _increaseWriteIndex(write);
444    
445                                            remaining -= write;
446                                    }
447                            }
448                            finally {
449                                    bufferLock.unlock();
450                            }
451                    }
452    
453                    @Override
454                    public void write(int c) throws IOException {
455                            if (_closed) {
456                                    throw new IOException("Stream closed");
457                            }
458    
459                            bufferLock.lock();
460    
461                            try {
462                                    waitUntilNotFull();
463    
464                                    buffer[writeIndex] = (char)c;
465    
466                                    _increaseWriteIndex(1);
467                            }
468                            finally {
469                                    bufferLock.unlock();
470                            }
471                    }
472    
473                    @Override
474                    public void write(String string) throws IOException {
475                            write(string, 0, string.length());
476                    }
477    
478                    @Override
479                    public void write(String string, int offset, int length)
480                            throws IOException {
481    
482                            if (_closed) {
483                                    throw new IOException("Stream closed");
484                            }
485    
486                            if (length <= 0) {
487                                    return;
488                            }
489    
490                            bufferLock.lock();
491    
492                            try {
493                                    int remaining = length;
494    
495                                    while (remaining > 0) {
496                                            waitUntilNotFull();
497    
498                                            int write = remaining;
499    
500                                            if (remaining > (buffer.length - count)) {
501                                                    write = buffer.length - count;
502                                            }
503    
504                                            int sourceBegin = offset + length - remaining;
505    
506                                            if ((buffer.length - writeIndex) >= write) {
507    
508                                                    // One step write
509    
510                                                    string.getChars(
511                                                            sourceBegin, sourceBegin + write, buffer,
512                                                            writeIndex);
513                                            }
514                                            else {
515    
516                                                    // Two step write
517    
518                                                    int tailLength = buffer.length - writeIndex;
519                                                    int headLength = write - tailLength;
520    
521                                                    string.getChars(
522                                                            sourceBegin, sourceBegin + tailLength, buffer,
523                                                            writeIndex);
524                                                    string.getChars(
525                                                            sourceBegin + tailLength,
526                                                            sourceBegin + tailLength + headLength, buffer, 0);
527                                            }
528    
529                                            _increaseWriteIndex(write);
530    
531                                            remaining -= write;
532                                    }
533                            }
534                            finally {
535                                    bufferLock.unlock();
536                            }
537                    }
538    
539                    protected void waitUntilNotFull() throws IOException {
540                            while (count == buffer.length) {
541                                    notFull.awaitUninterruptibly();
542    
543                                    if (_closed) {
544                                            throw new IOException("Stream closed");
545                                    }
546                            }
547                    }
548    
549                    private void _increaseWriteIndex(int produced) {
550                            writeIndex += produced;
551    
552                            if (writeIndex >= buffer.length) {
553                                    writeIndex -= buffer.length;
554                            }
555    
556                            if (count == 0) {
557                                    notEmpty.signalAll();
558                            }
559    
560                            count += produced;
561                    }
562    
563                    private volatile boolean _closed;
564    
565            }
566    
567    }