001
014
015 package com.liferay.portal.kernel.process.local;
016
017 import com.liferay.portal.kernel.concurrent.AsyncBroker;
018 import com.liferay.portal.kernel.concurrent.FutureListener;
019 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
020 import com.liferay.portal.kernel.process.ProcessCallable;
021 import com.liferay.portal.kernel.process.ProcessChannel;
022
023 import java.io.IOException;
024 import java.io.ObjectOutputStream;
025 import java.io.Serializable;
026
027 import java.util.Map;
028 import java.util.concurrent.Future;
029 import java.util.concurrent.atomic.AtomicLong;
030
031
034 public class LocalProcessChannel<T extends Serializable>
035 implements ProcessChannel<T> {
036
037 public LocalProcessChannel(
038 NoticeableFuture<T> noticeableFuture,
039 ObjectOutputStream objectOutputStream,
040 AsyncBroker<Long, Serializable> asyncBroker) {
041
042 _noticeableFuture = noticeableFuture;
043 _objectOutputStream = objectOutputStream;
044 _asyncBroker = asyncBroker;
045
046 _noticeableFuture.addFutureListener(new FutureListener<T>() {
047
048 @Override
049 public void complete(Future<T> future) {
050 try {
051 _objectOutputStream.close();
052 }
053 catch (IOException ioe) {
054 }
055 finally {
056 Map<Long, NoticeableFuture<Serializable>> map =
057 _asyncBroker.getOpenBids();
058
059 for (NoticeableFuture<Serializable> noticeableFuture :
060 map.values()) {
061
062 noticeableFuture.cancel(true);
063 }
064 }
065 }
066
067 });
068 }
069
070 @Override
071 public NoticeableFuture<T> getProcessNoticeableFuture() {
072 return _noticeableFuture;
073 }
074
075 @Override
076 public <V extends Serializable> NoticeableFuture<V> write(
077 ProcessCallable<V> processCallable) {
078
079 long id = _idGenerator.getAndIncrement();
080
081 NoticeableFuture<Serializable> noticeableFuture = _asyncBroker.post(id);
082
083 try {
084 _objectOutputStream.writeObject(
085 new RequestProcessCallable<V>(id, processCallable));
086
087 _objectOutputStream.flush();
088 }
089 catch (IOException ioe) {
090 _asyncBroker.takeWithException(id, ioe);
091 }
092
093 return (NoticeableFuture<V>)noticeableFuture;
094 }
095
096 private final AsyncBroker<Long, Serializable> _asyncBroker;
097 private final AtomicLong _idGenerator = new AtomicLong();
098 private final NoticeableFuture<T> _noticeableFuture;
099 private final ObjectOutputStream _objectOutputStream;
100
101 }