001
014
015 package com.liferay.portal.kernel.cache;
016
017 import com.liferay.portal.kernel.util.InitialThreadLocal;
018
019 import java.io.Serializable;
020
021 import java.util.Collections;
022 import java.util.Map;
023 import java.util.concurrent.ConcurrentHashMap;
024 import java.util.concurrent.ConcurrentMap;
025
026
029 public class AggregatedCacheListener<K extends Serializable, V>
030 implements CacheListener<K, V> {
031
032 public static boolean isRemoteInvoke() {
033 return _remoteInvokeThreadLocal.get();
034 }
035
036 public static boolean isSkipListener() {
037 return _skipListenerThreadLocal.get();
038 }
039
040 public static void setRemoteInvoke(boolean remoteInvoke) {
041 _remoteInvokeThreadLocal.set(remoteInvoke);
042 }
043
044 public static void setSkipListener(boolean skipListener) {
045 _skipListenerThreadLocal.set(skipListener);
046 }
047
048 public void addCacheListener(CacheListener<K, V> cacheListener) {
049 addCacheListener(cacheListener, CacheListenerScope.ALL);
050 }
051
052 public void addCacheListener(
053 CacheListener<K, V> cacheListener,
054 CacheListenerScope cacheListenerScope) {
055
056 _cacheListeners.putIfAbsent(cacheListener, cacheListenerScope);
057 }
058
059 public void clearAll() {
060 _cacheListeners.clear();
061 }
062
063 public Map<CacheListener<K, V>, CacheListenerScope> getCacheListeners() {
064 return Collections.unmodifiableMap(_cacheListeners);
065 }
066
067 @Override
068 public void notifyEntryEvicted(
069 PortalCache<K, V> portalCache, K key, V value, int timeToLive)
070 throws PortalCacheException {
071
072 if (_skipListenerThreadLocal.get()) {
073 return;
074 }
075
076 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
077 _cacheListeners.entrySet()) {
078
079 CacheListener<K, V> cacheListener = entry.getKey();
080
081 if (_shouldDeliver(cacheListener, entry.getValue())) {
082 cacheListener.notifyEntryEvicted(
083 portalCache, key, value, timeToLive);
084 }
085 }
086 }
087
088 @Override
089 public void notifyEntryExpired(
090 PortalCache<K, V> portalCache, K key, V value, int timeToLive)
091 throws PortalCacheException {
092
093 if (_skipListenerThreadLocal.get()) {
094 return;
095 }
096
097 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
098 _cacheListeners.entrySet()) {
099
100 CacheListener<K, V> cacheListener = entry.getKey();
101
102 if (_shouldDeliver(cacheListener, entry.getValue())) {
103 cacheListener.notifyEntryExpired(
104 portalCache, key, value, timeToLive);
105 }
106 }
107 }
108
109 @Override
110 public void notifyEntryPut(
111 PortalCache<K, V> portalCache, K key, V value, int timeToLive)
112 throws PortalCacheException {
113
114 if (_skipListenerThreadLocal.get()) {
115 return;
116 }
117
118 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
119 _cacheListeners.entrySet()) {
120
121 CacheListener<K, V> cacheListener = entry.getKey();
122
123 if (_shouldDeliver(cacheListener, entry.getValue())) {
124 cacheListener.notifyEntryPut(
125 portalCache, key, value, timeToLive);
126 }
127 }
128 }
129
130 @Override
131 public void notifyEntryRemoved(
132 PortalCache<K, V> portalCache, K key, V value, int timeToLive)
133 throws PortalCacheException {
134
135 if (_skipListenerThreadLocal.get()) {
136 return;
137 }
138
139 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
140 _cacheListeners.entrySet()) {
141
142 CacheListener<K, V> cacheListener = entry.getKey();
143
144 if (_shouldDeliver(cacheListener, entry.getValue())) {
145 cacheListener.notifyEntryRemoved(
146 portalCache, key, value, timeToLive);
147 }
148 }
149 }
150
151 @Override
152 public void notifyEntryUpdated(
153 PortalCache<K, V> portalCache, K key, V value, int timeToLive)
154 throws PortalCacheException {
155
156 if (_skipListenerThreadLocal.get()) {
157 return;
158 }
159
160 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
161 _cacheListeners.entrySet()) {
162
163 CacheListener<K, V> cacheListener = entry.getKey();
164
165 if (_shouldDeliver(cacheListener, entry.getValue())) {
166 cacheListener.notifyEntryUpdated(
167 portalCache, key, value, timeToLive);
168 }
169 }
170 }
171
172 @Override
173 public void notifyRemoveAll(PortalCache<K, V> portalCache)
174 throws PortalCacheException {
175
176 if (_skipListenerThreadLocal.get()) {
177 return;
178 }
179
180 for (Map.Entry<CacheListener<K, V>, CacheListenerScope> entry :
181 _cacheListeners.entrySet()) {
182
183 CacheListener<K, V> cacheListener = entry.getKey();
184
185 if (_shouldDeliver(cacheListener, entry.getValue())) {
186 cacheListener.notifyRemoveAll(portalCache);
187 }
188 }
189 }
190
191 public void removeCacheListener(CacheListener<K, V> cacheListener) {
192 _cacheListeners.remove(cacheListener);
193 }
194
195 private boolean _shouldDeliver(
196 CacheListener<K, V> cacheListener,
197 CacheListenerScope cacheListenerScope) {
198
199 if (_remoteInvokeThreadLocal.get()) {
200 if (cacheListener instanceof CacheReplicator) {
201 return false;
202 }
203
204 if (cacheListenerScope.equals(CacheListenerScope.ALL) ||
205 cacheListenerScope.equals(CacheListenerScope.REMOTE)) {
206
207 return true;
208 }
209
210 return false;
211 }
212
213 if (cacheListenerScope.equals(CacheListenerScope.ALL) ||
214 cacheListenerScope.equals(CacheListenerScope.LOCAL)) {
215
216 return true;
217 }
218
219 return false;
220 }
221
222 private static final ThreadLocal<Boolean> _remoteInvokeThreadLocal =
223 new InitialThreadLocal<Boolean>(
224 AggregatedCacheListener.class + "._remoteInvokeThreadLocal", false);
225 private static final ThreadLocal<Boolean> _skipListenerThreadLocal =
226 new InitialThreadLocal<Boolean>(
227 AggregatedCacheListener.class + "._skipListenerThreadLocal", false);
228
229 private final ConcurrentMap<CacheListener<K, V>, CacheListenerScope>
230 _cacheListeners =
231 new ConcurrentHashMap<CacheListener<K, V>, CacheListenerScope>();
232
233 }