001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.memory.FinalizeAction;
020 import com.liferay.portal.kernel.memory.FinalizeManager;
021 import com.liferay.portal.kernel.util.ReflectionUtil;
022
023 import java.lang.ref.Reference;
024 import java.lang.reflect.Field;
025
026 import java.util.Collections;
027 import java.util.Map;
028 import java.util.concurrent.ConcurrentMap;
029 import java.util.concurrent.Future;
030
031
034 public class AsyncBroker<K, V> {
035
036 public Map<K, NoticeableFuture<V>> getOpenBids() {
037 return Collections.<K, NoticeableFuture<V>>unmodifiableMap(
038 _defaultNoticeableFutures);
039 }
040
041 public NoticeableFuture<V> post(final K key) {
042 DefaultNoticeableFuture<V> defaultNoticeableFuture =
043 new DefaultNoticeableFuture<V>();
044
045 DefaultNoticeableFuture<V> previousDefaultNoticeableFuture =
046 _defaultNoticeableFutures.putIfAbsent(key, defaultNoticeableFuture);
047
048 if (previousDefaultNoticeableFuture != null) {
049 return previousDefaultNoticeableFuture;
050 }
051
052 defaultNoticeableFuture.addFutureListener(
053 new FutureListener<V>() {
054
055 @Override
056 public void complete(Future<V> future) {
057 _defaultNoticeableFutures.remove(key);
058 }
059
060 });
061
062 if (_REFERENT_FIELD != null) {
063 FinalizeManager.register(
064 defaultNoticeableFuture, new CancellationFinalizeAction(key),
065 FinalizeManager.PHANTOM_REFERENCE_FACTORY);
066 }
067
068 return defaultNoticeableFuture;
069 }
070
071 public NoticeableFuture<V> take(K key) {
072 return _defaultNoticeableFutures.remove(key);
073 }
074
075 public boolean takeWithException(K key, Throwable throwable) {
076 DefaultNoticeableFuture<V> defaultNoticeableFuture =
077 _defaultNoticeableFutures.remove(key);
078
079 if (defaultNoticeableFuture == null) {
080 return false;
081 }
082
083 defaultNoticeableFuture.setException(throwable);
084
085 return true;
086 }
087
088 public boolean takeWithResult(K key, V result) {
089 DefaultNoticeableFuture<V> defaultNoticeableFuture =
090 _defaultNoticeableFutures.remove(key);
091
092 if (defaultNoticeableFuture == null) {
093 return false;
094 }
095
096 defaultNoticeableFuture.set(result);
097
098 return true;
099 }
100
101 private static final Field _REFERENT_FIELD;
102
103 private static final Log _log = LogFactoryUtil.getLog(AsyncBroker.class);
104
105 static {
106 Field referentField = null;
107
108 try {
109 referentField = ReflectionUtil.getDeclaredField(
110 Reference.class, "referent");
111 }
112 catch (Throwable t) {
113 if (_log.isWarnEnabled()) {
114 _log.warn(
115 "Cancellation of orphaned noticeable futures is disabled " +
116 "because the JVM does not support phantom reference " +
117 "resurrection",
118 t);
119 }
120 }
121
122 _REFERENT_FIELD = referentField;
123 }
124
125 private final ConcurrentMap<K, DefaultNoticeableFuture<V>>
126 _defaultNoticeableFutures =
127 new ConcurrentReferenceValueHashMap<K, DefaultNoticeableFuture<V>>(
128 FinalizeManager.WEAK_REFERENCE_FACTORY);
129
130 private static class CancellationFinalizeAction implements FinalizeAction {
131
132 public CancellationFinalizeAction(Object key) {
133 _key = key;
134 }
135
136 @Override
137 public void doFinalize(final Reference<?> reference) {
138 try {
139 NoticeableFuture<?> noticeableFuture =
140 (NoticeableFuture<?>)_REFERENT_FIELD.get(reference);
141
142 if (noticeableFuture.cancel(true) && _log.isWarnEnabled()) {
143 _log.warn(
144 "Cancelled orphan noticeable future " +
145 noticeableFuture + " with key " + _key);
146 }
147 }
148 catch (Exception e) {
149 _log.error("Unable to access referent of " + reference, e);
150 }
151 }
152
153 private final Object _key;
154
155 }
156
157 }