001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.concurrent.AsyncBroker;
018 import com.liferay.portal.kernel.concurrent.FutureListener;
019 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
020 import com.liferay.portal.kernel.process.ProcessCallable;
021 import com.liferay.portal.kernel.process.ProcessChannel;
022
023 import java.io.IOException;
024 import java.io.ObjectOutputStream;
025 import java.io.Serializable;
026
027 import java.util.Map;
028 import java.util.concurrent.Future;
029 import java.util.concurrent.atomic.AtomicLong;
030
031
034 public class LocalProcessChannel<T extends Serializable>
035 implements ProcessChannel<T> {
036
037 public LocalProcessChannel(
038 NoticeableFuture<T> noticeableFuture,
039 ObjectOutputStream objectOutputStream,
040 AsyncBroker<Long, Serializable> asyncBroker) {
041
042 _noticeableFuture = noticeableFuture;
043 _objectOutputStream = objectOutputStream;
044 _asyncBroker = asyncBroker;
045
046 _noticeableFuture.addFutureListener(
047 new FutureListener<T>() {
048
049 @Override
050 public void complete(Future<T> future) {
051 try {
052 _objectOutputStream.close();
053 }
054 catch (IOException ioe) {
055 }
056 finally {
057 Map<Long, NoticeableFuture<Serializable>> map =
058 _asyncBroker.getOpenBids();
059
060 for (NoticeableFuture<Serializable> noticeableFuture :
061 map.values()) {
062
063 noticeableFuture.cancel(true);
064 }
065 }
066 }
067
068 });
069 }
070
071 @Override
072 public NoticeableFuture<T> getProcessNoticeableFuture() {
073 return _noticeableFuture;
074 }
075
076 @Override
077 public <V extends Serializable> NoticeableFuture<V> write(
078 ProcessCallable<V> processCallable) {
079
080 long id = _idGenerator.getAndIncrement();
081
082 NoticeableFuture<Serializable> noticeableFuture = _asyncBroker.post(id);
083
084 try {
085 _objectOutputStream.writeObject(
086 new RequestProcessCallable<V>(id, processCallable));
087
088 _objectOutputStream.flush();
089 }
090 catch (IOException ioe) {
091 _asyncBroker.takeWithException(id, ioe);
092 }
093
094 return (NoticeableFuture<V>)noticeableFuture;
095 }
096
097 private final AsyncBroker<Long, Serializable> _asyncBroker;
098 private final AtomicLong _idGenerator = new AtomicLong();
099 private final NoticeableFuture<T> _noticeableFuture;
100 private final ObjectOutputStream _objectOutputStream;
101
102 }