001
014
015 package com.liferay.portal.kernel.io;
016
017 import com.liferay.portal.kernel.util.StringPool;
018
019 import java.io.IOException;
020 import java.io.Reader;
021 import java.io.Writer;
022
023 import java.nio.CharBuffer;
024
025 import java.util.concurrent.locks.Condition;
026 import java.util.concurrent.locks.Lock;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
032 public class CharPipe {
033
034 public CharPipe() {
035 this(_DEFAULT_BUFFER_SIZE);
036 }
037
038 public CharPipe(int bufferSize) {
039 buffer = new char[bufferSize];
040 count = 0;
041 readIndex = 0;
042 writeIndex = 0;
043 }
044
045 public void close() {
046 close(false);
047 }
048
049 public void close(boolean force) {
050 _pipeWriter.close();
051
052 if (force) {
053 _pipeReader.close();
054 buffer = null;
055 }
056 else {
057 bufferLock.lock();
058
059 finished = true;
060
061 try {
062 notEmpty.signalAll();
063 }
064 finally {
065 bufferLock.unlock();
066 }
067 }
068 }
069
070 public Reader getReader() {
071 return _pipeReader;
072 }
073
074 public Writer getWriter() {
075 return _pipeWriter;
076 }
077
078 protected char[] buffer;
079 protected Lock bufferLock = new ReentrantLock();
080 protected int count;
081 protected boolean finished;
082 protected Condition notEmpty = bufferLock.newCondition();
083 protected Condition notFull = bufferLock.newCondition();
084 protected int readIndex;
085 protected int writeIndex;
086
087 private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
088
089 private final PipeReader _pipeReader = new PipeReader();
090 private final PipeWriter _pipeWriter = new PipeWriter();
091
092 private class PipeReader extends Reader {
093
094 @Override
095 public void close() {
096 bufferLock.lock();
097
098 try {
099 _closed = true;
100
101 notEmpty.signalAll();
102 }
103 finally {
104 bufferLock.unlock();
105 }
106 }
107
108 @Override
109 public void mark(int readAheadLimit) throws IOException {
110 throw new IOException("Mark is not supported");
111 }
112
113 @Override
114 public boolean markSupported() {
115 return false;
116 }
117
118 @Override
119 public int read() throws IOException {
120 if (_closed) {
121 throw new IOException("Stream closed");
122 }
123
124 bufferLock.lock();
125
126 try {
127 if (waitUntilNotEmpty()) {
128 return -1;
129 }
130
131 char result = buffer[readIndex];
132
133 _increaseReadIndex(1);
134
135 return result;
136 }
137 finally {
138 bufferLock.unlock();
139 }
140 }
141
142 @Override
143 public int read(char[] chars) throws IOException {
144 return read(chars, 0, chars.length);
145 }
146
147 @Override
148 public int read(char[] chars, int offset, int length)
149 throws IOException {
150
151 if (_closed) {
152 throw new IOException("Stream closed");
153 }
154
155 if (length <= 0) {
156 return 0;
157 }
158
159 bufferLock.lock();
160
161 try {
162 if (waitUntilNotEmpty()) {
163 return -1;
164 }
165
166 int read = length;
167
168 if (length > count) {
169 read = count;
170 }
171
172 if ((buffer.length - readIndex) >= read) {
173
174
175
176 System.arraycopy(buffer, readIndex, chars, offset, read);
177 }
178 else {
179
180
181
182 int tailLength = buffer.length - readIndex;
183 int headLength = read - tailLength;
184
185 System.arraycopy(
186 buffer, readIndex, chars, offset, tailLength);
187 System.arraycopy(
188 buffer, 0, chars, offset + tailLength, headLength);
189 }
190
191 _increaseReadIndex(read);
192
193 return read;
194 }
195 finally {
196 bufferLock.unlock();
197 }
198 }
199
200 @Override
201 public int read(CharBuffer charBuffer) throws IOException {
202 if (_closed) {
203 throw new IOException("Stream closed");
204 }
205
206 int length = charBuffer.remaining();
207
208 if (length <= 0) {
209 return 0;
210 }
211
212 char[] tempBuffer = new char[length];
213
214 int read = read(tempBuffer, 0, length);
215
216 if (read > 0) {
217 charBuffer.put(tempBuffer, 0, read);
218 }
219
220 return read;
221 }
222
223 @Override
224 public boolean ready() throws IOException {
225 if (_closed) {
226 throw new IOException("Stream closed");
227 }
228
229 bufferLock.lock();
230
231 try {
232 if (count > 0) {
233 return true;
234 }
235
236 return false;
237 }
238 finally {
239 bufferLock.unlock();
240 }
241 }
242
243 @Override
244 public void reset() throws IOException {
245 throw new IOException("Reset is not supported");
246 }
247
248 @Override
249 public long skip(long skip) throws IOException {
250 if (skip < 0) {
251 throw new IllegalArgumentException("Skip value is negative");
252 }
253
254 if (_closed) {
255 throw new IOException("Stream closed");
256 }
257
258 int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
259
260 bufferLock.lock();
261
262 try {
263 if ((_skipBuffer == null) ||
264 (_skipBuffer.length < skipBufferSize)) {
265
266 _skipBuffer = new char[skipBufferSize];
267 }
268
269 long remaining = skip;
270
271 while (remaining > 0) {
272 int skipped = read(
273 _skipBuffer, 0,
274 (int)Math.min(remaining, skipBufferSize));
275
276 if (skipped == -1) {
277 break;
278 }
279
280 remaining -= skipped;
281 }
282
283 return skip - remaining;
284 }
285 finally {
286 bufferLock.unlock();
287 }
288 }
289
290 protected boolean waitUntilNotEmpty() throws IOException {
291 while ((count == 0) && !finished) {
292 notEmpty.awaitUninterruptibly();
293
294 if (_closed) {
295 throw new IOException("Stream closed");
296 }
297 }
298
299 if ((count == 0) && finished) {
300 return true;
301 }
302 else {
303 return false;
304 }
305 }
306
307 private void _increaseReadIndex(int consumed) {
308 readIndex += consumed;
309
310 if (readIndex >= buffer.length) {
311 readIndex -= buffer.length;
312 }
313
314 if (count == buffer.length) {
315 notFull.signalAll();
316 }
317
318 count -= consumed;
319 }
320
321 private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
322
323 private volatile boolean _closed;
324 private char[] _skipBuffer;
325
326 }
327
328 private class PipeWriter extends Writer {
329
330 @Override
331 public Writer append(char c) throws IOException {
332 write(c);
333
334 return this;
335 }
336
337 @Override
338 public Writer append(CharSequence charSequence) throws IOException {
339 String string = null;
340
341 if (charSequence == null) {
342 string = StringPool.NULL;
343 }
344 else {
345 string = charSequence.toString();
346 }
347
348 write(string, 0, string.length());
349
350 return this;
351 }
352
353 @Override
354 public Writer append(CharSequence charSequence, int start, int end)
355 throws IOException {
356
357 String string = null;
358
359 if (charSequence == null) {
360 string = StringPool.NULL;
361 }
362 else {
363 string = charSequence.subSequence(start, end).toString();
364 }
365
366 write(string, 0, string.length());
367
368 return this;
369 }
370
371 @Override
372 public void close() {
373 bufferLock.lock();
374
375 try {
376 _closed = true;
377
378 notFull.signalAll();
379 }
380 finally {
381 bufferLock.unlock();
382 }
383 }
384
385 @Override
386 public void flush() {
387 }
388
389 @Override
390 public void write(char[] chars) throws IOException {
391 write(chars, 0, chars.length);
392 }
393
394 @Override
395 public void write(char[] chars, int offset, int length)
396 throws IOException {
397
398 if (_closed) {
399 throw new IOException("Stream closed");
400 }
401
402 if (length <= 0) {
403 return;
404 }
405
406 bufferLock.lock();
407
408 try {
409 int remaining = length;
410
411 while (remaining > 0) {
412 waitUntilNotFull();
413
414 int write = remaining;
415
416 if (remaining > (buffer.length - count)) {
417 write = buffer.length - count;
418 }
419
420 int sourceBegin = offset + length - remaining;
421
422 if ((buffer.length - writeIndex) >= write) {
423
424
425
426 System.arraycopy(
427 chars, sourceBegin, buffer, writeIndex, write);
428 }
429 else {
430
431
432
433 int tailLength = buffer.length - writeIndex;
434 int headLength = write - tailLength;
435
436 System.arraycopy(
437 chars, sourceBegin, buffer, writeIndex, tailLength);
438 System.arraycopy(
439 chars, sourceBegin + tailLength, buffer, 0,
440 headLength);
441 }
442
443 _increaseWriteIndex(write);
444
445 remaining -= write;
446 }
447 }
448 finally {
449 bufferLock.unlock();
450 }
451 }
452
453 @Override
454 public void write(int c) throws IOException {
455 if (_closed) {
456 throw new IOException("Stream closed");
457 }
458
459 bufferLock.lock();
460
461 try {
462 waitUntilNotFull();
463
464 buffer[writeIndex] = (char)c;
465
466 _increaseWriteIndex(1);
467 }
468 finally {
469 bufferLock.unlock();
470 }
471 }
472
473 @Override
474 public void write(String string) throws IOException {
475 write(string, 0, string.length());
476 }
477
478 @Override
479 public void write(String string, int offset, int length)
480 throws IOException {
481
482 if (_closed) {
483 throw new IOException("Stream closed");
484 }
485
486 if (length <= 0) {
487 return;
488 }
489
490 bufferLock.lock();
491
492 try {
493 int remaining = length;
494
495 while (remaining > 0) {
496 waitUntilNotFull();
497
498 int write = remaining;
499
500 if (remaining > (buffer.length - count)) {
501 write = buffer.length - count;
502 }
503
504 int sourceBegin = offset + length - remaining;
505
506 if ((buffer.length - writeIndex) >= write) {
507
508
509
510 string.getChars(
511 sourceBegin, sourceBegin + write, buffer,
512 writeIndex);
513 }
514 else {
515
516
517
518 int tailLength = buffer.length - writeIndex;
519 int headLength = write - tailLength;
520
521 string.getChars(
522 sourceBegin, sourceBegin + tailLength, buffer,
523 writeIndex);
524 string.getChars(
525 sourceBegin + tailLength,
526 sourceBegin + tailLength + headLength, buffer, 0);
527 }
528
529 _increaseWriteIndex(write);
530
531 remaining -= write;
532 }
533 }
534 finally {
535 bufferLock.unlock();
536 }
537 }
538
539 protected void waitUntilNotFull() throws IOException {
540 while (count == buffer.length) {
541 notFull.awaitUninterruptibly();
542
543 if (_closed) {
544 throw new IOException("Stream closed");
545 }
546 }
547 }
548
549 private void _increaseWriteIndex(int produced) {
550 writeIndex += produced;
551
552 if (writeIndex >= buffer.length) {
553 writeIndex -= buffer.length;
554 }
555
556 if (count == 0) {
557 notEmpty.signalAll();
558 }
559
560 count += produced;
561 }
562
563 private volatile boolean _closed;
564
565 }
566
567 }