001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 try {
056 _jChannel = new JChannel(channelProperties);
057
058 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059
060 _jChannel.connect(clusterName);
061
062 if (_log.isInfoEnabled()) {
063 _log.info(
064 "Create a new channel with properties " +
065 _jChannel.getProperties());
066 }
067 }
068 catch (Exception e) {
069 if (_log.isErrorEnabled()) {
070 _log.error("Unable to initialize channels", e);
071 }
072 }
073
074 _cacheManager = cacheManager;
075 }
076
077 @Override
078 public void dispose() throws CacheException {
079 if (_jChannel != null) {
080 _jChannel.close();
081 }
082 }
083
084 public Address getBusLocalAddress() {
085 return _jChannel.getAddress();
086 }
087
088 public List<Address> getBusMembership() {
089 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
090
091 View view = baseReceiver.getView();
092
093 return view.getMembers();
094 }
095
096 @Override
097 @SuppressWarnings("rawtypes")
098 public List getElements(List list) {
099 return null;
100 }
101
102 @Override
103 public String getGuid() {
104 return null;
105 }
106
107 @Override
108 @SuppressWarnings("rawtypes")
109 public List getKeys() {
110 return null;
111 }
112
113 @Override
114 public String getName() {
115 return null;
116 }
117
118 @Override
119 public Element getQuiet(Serializable serializable) {
120 return null;
121 }
122
123 @Override
124 public String getScheme() {
125 return _SCHEME;
126 }
127
128 @Override
129 public long getTimeForClusterToForm() {
130 return 0;
131 }
132
133 @Override
134 public String getUrl() {
135 return null;
136 }
137
138 @Override
139 public String getUrlBase() {
140 return null;
141 }
142
143 public void handleNotification(Serializable serializable) {
144 if (serializable instanceof JGroupEventMessage) {
145 handleJGroupsNotification((JGroupEventMessage)serializable);
146 }
147 else if (serializable instanceof List<?>) {
148 List<?> valueList = (List<?>)serializable;
149
150 for (Object object : valueList) {
151 if (object instanceof JGroupEventMessage) {
152 handleJGroupsNotification((JGroupEventMessage)object);
153 }
154 }
155 }
156 }
157
158 @Override
159 public void init() {
160 }
161
162 @Override
163 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
164 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
165
166 cachePeers.add(this);
167
168 return cachePeers;
169 }
170
171 @Override
172 public void put(Element element) {
173 }
174
175 @Override
176 public void registerPeer(String string) {
177 }
178
179 @Override
180 public boolean remove(Serializable serializable) {
181 return false;
182 }
183
184 @Override
185 public void removeAll() {
186 }
187
188 @SuppressWarnings("rawtypes")
189 public void send(Address address, List eventMessages)
190 throws RemoteException {
191
192 ArrayList<JGroupEventMessage> jGroupEventMessages =
193 new ArrayList<JGroupEventMessage>();
194
195 for (Object eventMessage : eventMessages) {
196 if (eventMessage instanceof JGroupEventMessage) {
197 JGroupEventMessage jGroupEventMessage =
198 (JGroupEventMessage)eventMessage;
199
200 jGroupEventMessages.add(jGroupEventMessage);
201 }
202 else {
203 if (_log.isDebugEnabled()) {
204 _log.debug(
205 eventMessage + "is not a JGroupEventMessage type");
206 }
207 }
208 }
209
210 try {
211 _jChannel.send(address, jGroupEventMessages);
212 }
213 catch (Throwable t) {
214 throw new RemoteException(t.getMessage());
215 }
216 }
217
218 @Override
219 @SuppressWarnings("rawtypes")
220 public void send(List eventMessages) throws RemoteException {
221 send(null, eventMessages);
222 }
223
224 @Override
225 public void unregisterPeer(String string) {
226 }
227
228 protected void handleJGroupsNotification(
229 JGroupEventMessage jGroupEventMessage) {
230
231 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
232
233 if (cache == null) {
234 return;
235 }
236
237 int event = jGroupEventMessage.getEvent();
238 Serializable key = jGroupEventMessage.getSerializableKey();
239
240 if ((event == JGroupEventMessage.REMOVE) &&
241 (cache.getQuiet(key) != null)) {
242
243 cache.remove(key, true);
244 }
245 else if (event == JGroupEventMessage.REMOVE_ALL) {
246 cache.removeAll(true);
247 }
248 else if (event == JGroupEventMessage.PUT) {
249 Element element = jGroupEventMessage.getElement();
250
251 cache.put(new Element(key, element.getObjectValue()), true);
252 }
253 }
254
255 private static final String _SCHEME = "JGroups";
256
257 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
258
259 private CacheManager _cacheManager;
260 private JChannel _jChannel;
261
262 private class EhcacheJGroupsReceiver extends BaseReceiver {
263
264 @Override
265 public void receive(Message message) {
266 Object object = message.getObject();
267
268 if (object == null) {
269 if (_log.isWarnEnabled()) {
270 _log.warn("Message content is null");
271 }
272
273 return;
274 }
275
276 if (object instanceof Serializable) {
277 handleNotification((Serializable)object);
278 }
279 else {
280 if (_log.isWarnEnabled()) {
281 _log.warn(
282 "Unable to process message content of type " +
283 object.getClass().getName());
284 }
285 }
286 }
287
288 }
289
290 }