001
014
015 package com.liferay.portal.kernel.io;
016
017 import com.liferay.portal.kernel.util.StringPool;
018
019 import java.io.IOException;
020 import java.io.Reader;
021 import java.io.Writer;
022
023 import java.nio.CharBuffer;
024
025 import java.util.concurrent.locks.Condition;
026 import java.util.concurrent.locks.Lock;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
032 public class CharPipe {
033
034 public CharPipe() {
035 this(_DEFAULT_BUFFER_SIZE);
036 }
037
038 public CharPipe(int bufferSize) {
039 buffer = new char[bufferSize];
040 count = 0;
041 readIndex = 0;
042 writeIndex = 0;
043 }
044
045 public void close() {
046 close(false);
047 }
048
049 public void close(boolean force) {
050 _pipeWriter.close();
051
052 if (force) {
053 _pipeReader.close();
054 buffer = null;
055 }
056 else {
057 bufferLock.lock();
058
059 finished = true;
060
061 try {
062 notEmpty.signalAll();
063 }
064 finally {
065 bufferLock.unlock();
066 }
067 }
068 }
069
070 public Reader getReader() {
071 return _pipeReader;
072 }
073
074 public Writer getWriter() {
075 return _pipeWriter;
076 }
077
078 protected char[] buffer;
079 protected Lock bufferLock = new ReentrantLock();
080 protected int count;
081 protected boolean finished;
082 protected Condition notEmpty = bufferLock.newCondition();
083 protected Condition notFull = bufferLock.newCondition();
084 protected int readIndex;
085 protected int writeIndex;
086
087 private class PipeReader extends Reader {
088
089 @Override
090 public void close() {
091 bufferLock.lock();
092
093 try {
094 _closed = true;
095
096 notEmpty.signalAll();
097 }
098 finally {
099 bufferLock.unlock();
100 }
101 }
102
103 @Override
104 public void mark(int readAheadLimit) throws IOException {
105 throw new IOException("Mark is not supported");
106 }
107
108 @Override
109 public boolean markSupported() {
110 return false;
111 }
112
113 @Override
114 public int read() throws IOException {
115 if (_closed) {
116 throw new IOException("Stream closed");
117 }
118
119 bufferLock.lock();
120
121 try {
122 if (waitUntilNotEmpty()) {
123 return -1;
124 }
125
126 char result = buffer[readIndex];
127
128 increaseReadIndex(1);
129
130 return result;
131 }
132 finally {
133 bufferLock.unlock();
134 }
135 }
136
137 @Override
138 public int read(char[] chars) throws IOException {
139 return read(chars, 0, chars.length);
140 }
141
142 @Override
143 public int read(char[] chars, int offset, int length)
144 throws IOException {
145
146 if (_closed) {
147 throw new IOException("Stream closed");
148 }
149
150 if (length <= 0) {
151 return 0;
152 }
153
154 bufferLock.lock();
155
156 try {
157 if (waitUntilNotEmpty()) {
158 return -1;
159 }
160
161 int read = length;
162
163 if (length > count) {
164 read = count;
165 }
166
167 if ((buffer.length - readIndex) >= read) {
168
169
170
171 System.arraycopy(buffer, readIndex, chars, offset, read);
172 }
173 else {
174
175
176
177 int tailLength = buffer.length - readIndex;
178 int headLength = read - tailLength;
179
180 System.arraycopy(
181 buffer, readIndex, chars, offset, tailLength);
182 System.arraycopy(
183 buffer, 0, chars, offset + tailLength, headLength);
184 }
185
186 increaseReadIndex(read);
187
188 return read;
189 }
190 finally {
191 bufferLock.unlock();
192 }
193 }
194
195 @Override
196 public int read(CharBuffer charBuffer) throws IOException {
197 if (_closed) {
198 throw new IOException("Stream closed");
199 }
200
201 int length = charBuffer.remaining();
202
203 if (length <= 0) {
204 return 0;
205 }
206
207 char[] tempBuffer = new char[length];
208
209 int read = read(tempBuffer, 0, length);
210
211 if (read > 0) {
212 charBuffer.put(tempBuffer, 0, read);
213 }
214
215 return read;
216 }
217
218 @Override
219 public boolean ready() throws IOException {
220 if (_closed) {
221 throw new IOException("Stream closed");
222 }
223
224 bufferLock.lock();
225
226 try {
227 return count > 0;
228 }
229 finally {
230 bufferLock.unlock();
231 }
232 }
233
234 @Override
235 public void reset() throws IOException {
236 throw new IOException("Reset is not supported");
237 }
238
239 @Override
240 public long skip(long skip) throws IOException {
241 if (skip < 0) {
242 throw new IllegalArgumentException("Skip value is negative");
243 }
244
245 if (_closed) {
246 throw new IOException("Stream closed");
247 }
248
249 int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
250
251 bufferLock.lock();
252
253 try {
254 if ((_skipBuffer == null) ||
255 (_skipBuffer.length < skipBufferSize)) {
256
257 _skipBuffer = new char[skipBufferSize];
258 }
259
260 long remaining = skip;
261
262 while (remaining > 0) {
263 int skipped = read(
264 _skipBuffer, 0,
265 (int)Math.min(remaining, skipBufferSize));
266
267 if (skipped == -1) {
268 break;
269 }
270
271 remaining -= skipped;
272 }
273
274 return skip - remaining;
275 }
276 finally {
277 bufferLock.unlock();
278 }
279 }
280
281 protected boolean waitUntilNotEmpty() throws IOException {
282 while ((count == 0) && !finished) {
283 notEmpty.awaitUninterruptibly();
284
285 if (_closed) {
286 throw new IOException("Stream closed");
287 }
288 }
289
290 if ((count == 0) && finished) {
291 return true;
292 }
293 else {
294 return false;
295 }
296 }
297
298 private void increaseReadIndex(int consumed) {
299 readIndex += consumed;
300
301 if (readIndex >= buffer.length) {
302 readIndex -= buffer.length;
303 }
304
305 if (count == buffer.length) {
306 notFull.signalAll();
307 }
308
309 count -= consumed;
310 }
311
312 private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
313
314 private volatile boolean _closed;
315 private char[] _skipBuffer;
316
317 }
318
319 private class PipeWriter extends Writer {
320
321 @Override
322 public Writer append(char c) throws IOException {
323 write(c);
324
325 return this;
326 }
327
328 @Override
329 public Writer append(CharSequence charSequence) throws IOException {
330 String string = null;
331
332 if (charSequence == null) {
333 string = StringPool.NULL;
334 }
335 else {
336 string = charSequence.toString();
337 }
338
339 write(string, 0, string.length());
340
341 return this;
342 }
343
344 @Override
345 public Writer append(CharSequence charSequence, int start, int end)
346 throws IOException {
347
348 String string = null;
349
350 if (charSequence == null) {
351 string = StringPool.NULL;
352 }
353 else {
354 string = charSequence.subSequence(start, end).toString();
355 }
356
357 write(string, 0, string.length());
358
359 return this;
360 }
361
362 @Override
363 public void close() {
364 bufferLock.lock();
365
366 try {
367 _closed = true;
368
369 notFull.signalAll();
370 }
371 finally {
372 bufferLock.unlock();
373 }
374 }
375
376 @Override
377 public void flush() {
378 }
379
380 @Override
381 public void write(char[] chars) throws IOException {
382 write(chars, 0, chars.length);
383 }
384
385 @Override
386 public void write(char[] chars, int offset, int length)
387 throws IOException {
388
389 if (_closed) {
390 throw new IOException("Stream closed");
391 }
392
393 if (length <= 0) {
394 return;
395 }
396
397 bufferLock.lock();
398
399 try {
400 int remaining = length;
401
402 while (remaining > 0) {
403 waitUntilNotFull();
404
405 int write = remaining;
406
407 if (remaining > (buffer.length - count)) {
408 write = buffer.length - count;
409 }
410
411 int sourceBegin = offset + length - remaining;
412
413 if ((buffer.length - writeIndex) >= write) {
414
415
416
417 System.arraycopy(
418 chars, sourceBegin, buffer, writeIndex, write);
419 }
420 else {
421
422
423
424 int tailLength = buffer.length - writeIndex;
425 int headLength = write - tailLength;
426
427 System.arraycopy(
428 chars, sourceBegin, buffer, writeIndex, tailLength);
429 System.arraycopy(
430 chars, sourceBegin + tailLength, buffer, 0,
431 headLength);
432 }
433
434 increaseWriteIndex(write);
435
436 remaining -= write;
437 }
438 }
439 finally {
440 bufferLock.unlock();
441 }
442 }
443
444 @Override
445 public void write(int c) throws IOException {
446 if (_closed) {
447 throw new IOException("Stream closed");
448 }
449
450 bufferLock.lock();
451
452 try {
453 waitUntilNotFull();
454
455 buffer[writeIndex] = (char)c;
456
457 increaseWriteIndex(1);
458 }
459 finally {
460 bufferLock.unlock();
461 }
462 }
463
464 @Override
465 public void write(String string) throws IOException {
466 write(string, 0, string.length());
467 }
468
469 @Override
470 public void write(String string, int offset, int length)
471 throws IOException {
472
473 if (_closed) {
474 throw new IOException("Stream closed");
475 }
476
477 if (length <= 0) {
478 return;
479 }
480
481 bufferLock.lock();
482
483 try {
484 int remaining = length;
485
486 while (remaining > 0) {
487 waitUntilNotFull();
488
489 int write = remaining;
490
491 if (remaining > buffer.length - count) {
492 write = buffer.length - count;
493 }
494
495 int sourceBegin = offset + length - remaining;
496
497 if ((buffer.length - writeIndex) >= write) {
498
499
500
501 string.getChars(
502 sourceBegin, sourceBegin + write, buffer,
503 writeIndex);
504 }
505 else {
506
507
508
509 int tailLength = buffer.length - writeIndex;
510 int headLength = write - tailLength;
511
512 string.getChars(
513 sourceBegin, sourceBegin + tailLength, buffer,
514 writeIndex);
515 string.getChars(
516 sourceBegin + tailLength,
517 sourceBegin + tailLength + headLength, buffer, 0);
518 }
519
520 increaseWriteIndex(write);
521
522 remaining -= write;
523 }
524 }
525 finally {
526 bufferLock.unlock();
527 }
528 }
529
530 protected void waitUntilNotFull() throws IOException {
531 while (count == buffer.length) {
532 notFull.awaitUninterruptibly();
533
534 if (_closed) {
535 throw new IOException("Stream closed");
536 }
537 }
538 }
539
540 private void increaseWriteIndex(int produced) {
541 writeIndex += produced;
542
543 if (writeIndex >= buffer.length) {
544 writeIndex -= buffer.length;
545 }
546
547 if (count == 0) {
548 notEmpty.signalAll();
549 }
550
551 count += produced;
552 }
553
554 private volatile boolean _closed;
555
556 }
557
558 private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
559
560 private PipeReader _pipeReader = new PipeReader();
561 private PipeWriter _pipeWriter = new PipeWriter();
562
563 }