001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 _cacheManager = cacheManager;
056
057 JChannel jChannel = null;
058
059 try {
060 jChannel = new JChannel(channelProperties);
061
062 jChannel.setReceiver(new EhcacheJGroupsReceiver());
063
064 jChannel.connect(clusterName);
065
066 if (_log.isInfoEnabled()) {
067 _log.info(
068 "Create a new channel with properties " +
069 jChannel.getProperties());
070 }
071 }
072 catch (Exception e) {
073 if (_log.isErrorEnabled()) {
074 _log.error("Unable to initialize channels", e);
075 }
076 }
077
078 _jChannel = jChannel;
079
080 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
081
082 baseReceiver.openLatch();
083 }
084
085 @Override
086 public void dispose() throws CacheException {
087 if (_jChannel != null) {
088 _jChannel.close();
089 }
090 }
091
092 public Address getBusLocalAddress() {
093 return _jChannel.getAddress();
094 }
095
096 public List<Address> getBusMembership() {
097 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
098
099 View view = baseReceiver.getView();
100
101 return view.getMembers();
102 }
103
104 @Override
105 @SuppressWarnings("rawtypes")
106 public List getElements(List list) {
107 return null;
108 }
109
110 @Override
111 public String getGuid() {
112 return null;
113 }
114
115 @Override
116 @SuppressWarnings("rawtypes")
117 public List getKeys() {
118 return null;
119 }
120
121 @Override
122 public String getName() {
123 return null;
124 }
125
126 @Override
127 public Element getQuiet(Serializable serializable) {
128 return null;
129 }
130
131 @Override
132 public String getScheme() {
133 return _SCHEME;
134 }
135
136 @Override
137 public long getTimeForClusterToForm() {
138 return 0;
139 }
140
141 @Override
142 public String getUrl() {
143 return null;
144 }
145
146 @Override
147 public String getUrlBase() {
148 return null;
149 }
150
151 public void handleNotification(Serializable serializable) {
152 if (serializable instanceof JGroupEventMessage) {
153 handleJGroupsNotification((JGroupEventMessage)serializable);
154 }
155 else if (serializable instanceof List<?>) {
156 List<?> valueList = (List<?>)serializable;
157
158 for (Object object : valueList) {
159 if (object instanceof JGroupEventMessage) {
160 handleJGroupsNotification((JGroupEventMessage)object);
161 }
162 }
163 }
164 }
165
166 @Override
167 public void init() {
168 }
169
170 @Override
171 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
172 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
173
174 cachePeers.add(this);
175
176 return cachePeers;
177 }
178
179 @Override
180 public void put(Element element) {
181 }
182
183 @Override
184 public void registerPeer(String string) {
185 }
186
187 @Override
188 public boolean remove(Serializable serializable) {
189 return false;
190 }
191
192 @Override
193 public void removeAll() {
194 }
195
196 @SuppressWarnings("rawtypes")
197 public void send(Address address, List eventMessages)
198 throws RemoteException {
199
200 ArrayList<JGroupEventMessage> jGroupEventMessages =
201 new ArrayList<JGroupEventMessage>();
202
203 for (Object eventMessage : eventMessages) {
204 if (eventMessage instanceof JGroupEventMessage) {
205 JGroupEventMessage jGroupEventMessage =
206 (JGroupEventMessage)eventMessage;
207
208 jGroupEventMessages.add(jGroupEventMessage);
209 }
210 else {
211 if (_log.isDebugEnabled()) {
212 _log.debug(
213 eventMessage + "is not a JGroupEventMessage type");
214 }
215 }
216 }
217
218 try {
219 _jChannel.send(address, jGroupEventMessages);
220 }
221 catch (Throwable t) {
222 throw new RemoteException(t.getMessage());
223 }
224 }
225
226 @Override
227 @SuppressWarnings("rawtypes")
228 public void send(List eventMessages) throws RemoteException {
229 send(null, eventMessages);
230 }
231
232 @Override
233 public void unregisterPeer(String string) {
234 }
235
236 protected void handleJGroupsNotification(
237 JGroupEventMessage jGroupEventMessage) {
238
239 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
240
241 if (cache == null) {
242 return;
243 }
244
245 int event = jGroupEventMessage.getEvent();
246
247 if (event == JGroupEventMessage.REMOVE_ALL) {
248 cache.removeAll(true);
249
250 return;
251 }
252
253 Serializable key = jGroupEventMessage.getSerializableKey();
254
255 if (key == null) {
256 throw new NullPointerException("Key is null");
257 }
258
259 if ((event == JGroupEventMessage.REMOVE) &&
260 (cache.getQuiet(key) != null)) {
261
262 cache.remove(key, true);
263 }
264 else if (event == JGroupEventMessage.PUT) {
265 Element element = jGroupEventMessage.getElement();
266
267 Object value = element.getObjectValue();
268
269 if (value == null) {
270 cache.remove(key, true);
271 }
272 else {
273 cache.put(new Element(key, value), true);
274 }
275 }
276 }
277
278 private static final String _SCHEME = "JGroups";
279
280 private static final Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
281
282 private final CacheManager _cacheManager;
283 private final JChannel _jChannel;
284
285 private class EhcacheJGroupsReceiver extends BaseReceiver {
286
287 @Override
288 protected void doReceive(Message message) {
289 Object object = message.getObject();
290
291 if (object == null) {
292 if (_log.isWarnEnabled()) {
293 _log.warn("Message content is null");
294 }
295
296 return;
297 }
298
299 if (object instanceof Serializable) {
300 handleNotification((Serializable)object);
301 }
302 else {
303 if (_log.isWarnEnabled()) {
304 _log.warn(
305 "Unable to process message content of type " +
306 object.getClass().getName());
307 }
308 }
309 }
310
311 }
312
313 }