001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 _cacheManager = cacheManager;
056
057 JChannel jChannel = null;
058
059 try {
060 jChannel = new JChannel(channelProperties);
061
062 jChannel.setReceiver(new EhcacheJGroupsReceiver());
063
064 jChannel.connect(clusterName);
065
066 if (_log.isInfoEnabled()) {
067 _log.info(
068 "Create a new channel with properties " +
069 jChannel.getProperties());
070 }
071 }
072 catch (Exception e) {
073 if (_log.isErrorEnabled()) {
074 _log.error("Unable to initialize channels", e);
075 }
076 }
077
078 _jChannel = jChannel;
079 }
080
081 @Override
082 public void dispose() throws CacheException {
083 if (_jChannel != null) {
084 _jChannel.close();
085 }
086 }
087
088 public Address getBusLocalAddress() {
089 return _jChannel.getAddress();
090 }
091
092 public List<Address> getBusMembership() {
093 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
094
095 View view = baseReceiver.getView();
096
097 return view.getMembers();
098 }
099
100 @Override
101 @SuppressWarnings("rawtypes")
102 public List getElements(List list) {
103 return null;
104 }
105
106 @Override
107 public String getGuid() {
108 return null;
109 }
110
111 @Override
112 @SuppressWarnings("rawtypes")
113 public List getKeys() {
114 return null;
115 }
116
117 @Override
118 public String getName() {
119 return null;
120 }
121
122 @Override
123 public Element getQuiet(Serializable serializable) {
124 return null;
125 }
126
127 @Override
128 public String getScheme() {
129 return _SCHEME;
130 }
131
132 @Override
133 public long getTimeForClusterToForm() {
134 return 0;
135 }
136
137 @Override
138 public String getUrl() {
139 return null;
140 }
141
142 @Override
143 public String getUrlBase() {
144 return null;
145 }
146
147 public void handleNotification(Serializable serializable) {
148 if (serializable instanceof JGroupEventMessage) {
149 handleJGroupsNotification((JGroupEventMessage)serializable);
150 }
151 else if (serializable instanceof List<?>) {
152 List<?> valueList = (List<?>)serializable;
153
154 for (Object object : valueList) {
155 if (object instanceof JGroupEventMessage) {
156 handleJGroupsNotification((JGroupEventMessage)object);
157 }
158 }
159 }
160 }
161
162 @Override
163 public void init() {
164 }
165
166 @Override
167 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
168 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
169
170 cachePeers.add(this);
171
172 return cachePeers;
173 }
174
175 @Override
176 public void put(Element element) {
177 }
178
179 @Override
180 public void registerPeer(String string) {
181 }
182
183 @Override
184 public boolean remove(Serializable serializable) {
185 return false;
186 }
187
188 @Override
189 public void removeAll() {
190 }
191
192 @SuppressWarnings("rawtypes")
193 public void send(Address address, List eventMessages)
194 throws RemoteException {
195
196 ArrayList<JGroupEventMessage> jGroupEventMessages =
197 new ArrayList<JGroupEventMessage>();
198
199 for (Object eventMessage : eventMessages) {
200 if (eventMessage instanceof JGroupEventMessage) {
201 JGroupEventMessage jGroupEventMessage =
202 (JGroupEventMessage)eventMessage;
203
204 jGroupEventMessages.add(jGroupEventMessage);
205 }
206 else {
207 if (_log.isDebugEnabled()) {
208 _log.debug(
209 eventMessage + "is not a JGroupEventMessage type");
210 }
211 }
212 }
213
214 try {
215 _jChannel.send(address, jGroupEventMessages);
216 }
217 catch (Throwable t) {
218 throw new RemoteException(t.getMessage());
219 }
220 }
221
222 @Override
223 @SuppressWarnings("rawtypes")
224 public void send(List eventMessages) throws RemoteException {
225 send(null, eventMessages);
226 }
227
228 @Override
229 public void unregisterPeer(String string) {
230 }
231
232 protected void handleJGroupsNotification(
233 JGroupEventMessage jGroupEventMessage) {
234
235 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
236
237 if (cache == null) {
238 return;
239 }
240
241 int event = jGroupEventMessage.getEvent();
242
243 if (event == JGroupEventMessage.REMOVE_ALL) {
244 cache.removeAll(true);
245
246 return;
247 }
248
249 Serializable key = jGroupEventMessage.getSerializableKey();
250
251 if (key == null) {
252 throw new NullPointerException("Key is null");
253 }
254
255 if ((event == JGroupEventMessage.REMOVE) &&
256 (cache.getQuiet(key) != null)) {
257
258 cache.remove(key, true);
259 }
260 else if (event == JGroupEventMessage.PUT) {
261 Element element = jGroupEventMessage.getElement();
262
263 Object value = element.getObjectValue();
264
265 if (value == null) {
266 cache.remove(key, true);
267 }
268 else {
269 cache.put(new Element(key, value), true);
270 }
271 }
272 }
273
274 private static final String _SCHEME = "JGroups";
275
276 private static final Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
277
278 private final CacheManager _cacheManager;
279 private final JChannel _jChannel;
280
281 private class EhcacheJGroupsReceiver extends BaseReceiver {
282
283 @Override
284 public void receive(Message message) {
285 Object object = message.getObject();
286
287 if (object == null) {
288 if (_log.isWarnEnabled()) {
289 _log.warn("Message content is null");
290 }
291
292 return;
293 }
294
295 if (object instanceof Serializable) {
296 handleNotification((Serializable)object);
297 }
298 else {
299 if (_log.isWarnEnabled()) {
300 _log.warn(
301 "Unable to process message content of type " +
302 object.getClass().getName());
303 }
304 }
305 }
306
307 }
308
309 }