001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 try {
056 _jChannel = new JChannel(channelProperties);
057
058 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059
060 _jChannel.connect(clusterName);
061
062 if (_log.isInfoEnabled()) {
063 _log.info(
064 "Create a new channel with properties " +
065 _jChannel.getProperties());
066 }
067 }
068 catch (Exception e) {
069 if (_log.isErrorEnabled()) {
070 _log.error("Unable to initialize channels", e);
071 }
072 }
073
074 _cacheManager = cacheManager;
075 }
076
077 public void dispose() throws CacheException {
078 if (_jChannel != null) {
079 _jChannel.close();
080 }
081 }
082
083 public Address getBusLocalAddress() {
084 return _jChannel.getAddress();
085 }
086
087 public List<Address> getBusMembership() {
088 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
089
090 View view = baseReceiver.getView();
091
092 return view.getMembers();
093 }
094
095 @SuppressWarnings("rawtypes")
096 public List getElements(List list) {
097 return null;
098 }
099
100 public String getGuid() {
101 return null;
102 }
103
104 @SuppressWarnings("rawtypes")
105 public List getKeys() {
106 return null;
107 }
108
109 public String getName() {
110 return null;
111 }
112
113 public Element getQuiet(Serializable serializable) {
114 return null;
115 }
116
117 public String getScheme() {
118 return _SCHEME;
119 }
120
121 public long getTimeForClusterToForm() {
122 return 0;
123 }
124
125 public String getUrl() {
126 return null;
127 }
128
129 public String getUrlBase() {
130 return null;
131 }
132
133 public void handleNotification(Serializable serializable) {
134 if (serializable instanceof JGroupEventMessage) {
135 handleJGroupsNotification((JGroupEventMessage)serializable);
136 }
137 else if (serializable instanceof List<?>) {
138 List<?> valueList = (List<?>)serializable;
139
140 for (Object object : valueList) {
141 if (object instanceof JGroupEventMessage) {
142 handleJGroupsNotification((JGroupEventMessage)object);
143 }
144 }
145 }
146 }
147
148 public void init() {
149 }
150
151 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
152 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
153
154 cachePeers.add(this);
155
156 return cachePeers;
157 }
158
159 public void put(Element element) {
160 }
161
162 public void registerPeer(String string) {
163 }
164
165 public boolean remove(Serializable serializable) {
166 return false;
167 }
168
169 public void removeAll() {
170 }
171
172 @SuppressWarnings("rawtypes")
173 public void send(Address address, List eventMessages)
174 throws RemoteException {
175
176 ArrayList<JGroupEventMessage> jGroupEventMessages =
177 new ArrayList<JGroupEventMessage>();
178
179 for (Object eventMessage : eventMessages) {
180 if (eventMessage instanceof JGroupEventMessage) {
181 JGroupEventMessage jGroupEventMessage =
182 (JGroupEventMessage)eventMessage;
183
184 jGroupEventMessages.add(jGroupEventMessage);
185 }
186 else {
187 if (_log.isDebugEnabled()) {
188 _log.debug(
189 eventMessage + "is not a JGroupEventMessage type");
190 }
191 }
192 }
193
194 try {
195 _jChannel.send(address, jGroupEventMessages);
196 }
197 catch (Throwable t) {
198 throw new RemoteException(t.getMessage());
199 }
200 }
201
202 @SuppressWarnings("rawtypes")
203 public void send(List eventMessages) throws RemoteException {
204 send(null, eventMessages);
205 }
206
207 public void unregisterPeer(String string) {
208 }
209
210 protected void handleJGroupsNotification(
211 JGroupEventMessage jGroupEventMessage) {
212
213 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
214
215 if (cache == null) {
216 return;
217 }
218
219 int event = jGroupEventMessage.getEvent();
220 Serializable key = jGroupEventMessage.getSerializableKey();
221
222 if ((event == JGroupEventMessage.REMOVE) &&
223 (cache.getQuiet(key) != null)) {
224
225 cache.remove(key, true);
226 }
227 else if (event == JGroupEventMessage.REMOVE_ALL) {
228 cache.removeAll(true);
229 }
230 else if (event == JGroupEventMessage.PUT) {
231 Element element = jGroupEventMessage.getElement();
232
233 cache.put(new Element(key, element.getValue()), true);
234 }
235 }
236
237 private static final String _SCHEME = "JGroups";
238
239 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
240
241 private CacheManager _cacheManager;
242 private JChannel _jChannel;
243
244 private class EhcacheJGroupsReceiver extends BaseReceiver {
245
246 @Override
247 public void receive(Message message) {
248 Object object = message.getObject();
249
250 if (object == null) {
251 if (_log.isWarnEnabled()) {
252 _log.warn("Message content is null");
253 }
254
255 return;
256 }
257
258 if (object instanceof Serializable) {
259 handleNotification((Serializable)object);
260 }
261 else {
262 if (_log.isWarnEnabled()) {
263 _log.warn(
264 "Unable to process message content of type " +
265 object.getClass().getName());
266 }
267 }
268 }
269
270 }
271
272 }