001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040
041
048 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
049
050 public JGroupsManager(
051 CacheManager cacheManager, String clusterName,
052 String channelProperties) {
053
054 try {
055 _jChannel = new JChannel(channelProperties);
056
057 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
058
059 _jChannel.connect(clusterName);
060
061 if (_log.isInfoEnabled()) {
062 _log.info(
063 "Create a new channel with properties " +
064 _jChannel.getProperties());
065 }
066 }
067 catch (Exception e) {
068 if (_log.isErrorEnabled()) {
069 _log.error("Unable to initialize channels", e);
070 }
071 }
072
073 _cacheManager = cacheManager;
074 }
075
076 public void dispose() throws CacheException {
077 if (_jChannel != null) {
078 _jChannel.close();
079 }
080 }
081
082 public Address getBusLocalAddress() {
083 return _jChannel.getAddress();
084 }
085
086 public List<Address> getBusMembership() {
087 return _jChannel.getView().getMembers();
088 }
089
090 @SuppressWarnings("rawtypes")
091 public List getElements(List list) {
092 return null;
093 }
094
095 public String getGuid() {
096 return null;
097 }
098
099 @SuppressWarnings("rawtypes")
100 public List getKeys() {
101 return null;
102 }
103
104 public String getName() {
105 return null;
106 }
107
108 public Element getQuiet(Serializable serializable) {
109 return null;
110 }
111
112 public String getScheme() {
113 return _SCHEME;
114 }
115
116 public long getTimeForClusterToForm() {
117 return 0;
118 }
119
120 public String getUrl() {
121 return null;
122 }
123
124 public String getUrlBase() {
125 return null;
126 }
127
128 public void handleNotification(Serializable serializable) {
129 if (serializable instanceof JGroupEventMessage) {
130 handleJGroupsNotification((JGroupEventMessage)serializable);
131 }
132 else if (serializable instanceof List<?>) {
133 List<?> valueList = (List<?>)serializable;
134
135 for (Object object : valueList) {
136 if (object instanceof JGroupEventMessage) {
137 handleJGroupsNotification((JGroupEventMessage)object);
138 }
139 }
140 }
141 }
142
143 public void init() {
144 }
145
146 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
147 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
148
149 cachePeers.add(this);
150
151 return cachePeers;
152 }
153
154 public void put(Element element) {
155 }
156
157 public void registerPeer(String string) {
158 }
159
160 public boolean remove(Serializable serializable) {
161 return false;
162 }
163
164 public void removeAll() {
165 }
166
167 @SuppressWarnings("rawtypes")
168 public void send(Address address, List eventMessages)
169 throws RemoteException {
170
171 ArrayList<JGroupEventMessage> jGroupEventMessages =
172 new ArrayList<JGroupEventMessage>();
173
174 for (Object eventMessage : eventMessages) {
175 if (eventMessage instanceof JGroupEventMessage) {
176 JGroupEventMessage jGroupEventMessage =
177 (JGroupEventMessage)eventMessage;
178
179 jGroupEventMessages.add(jGroupEventMessage);
180 }
181 else {
182 if (_log.isDebugEnabled()) {
183 _log.debug(
184 eventMessage + "is not a JGroupEventMessage type");
185 }
186 }
187 }
188
189 try {
190 _jChannel.send(address, null, jGroupEventMessages);
191 }
192 catch (Throwable t) {
193 throw new RemoteException(t.getMessage());
194 }
195 }
196
197 @SuppressWarnings("rawtypes")
198 public void send(List eventMessages) throws RemoteException {
199 send(null, eventMessages);
200 }
201
202 public void unregisterPeer(String string) {
203 }
204
205 protected void handleJGroupsNotification(
206 JGroupEventMessage jGroupEventMessage) {
207
208 Cache cache = _cacheManager.getCache(
209 jGroupEventMessage.getCacheName());
210
211 if (cache == null) {
212 return;
213 }
214
215 int event = jGroupEventMessage.getEvent();
216 Serializable key = jGroupEventMessage.getSerializableKey();
217
218 if ((event == JGroupEventMessage.REMOVE) &&
219 (cache.getQuiet(key) != null)) {
220
221 cache.remove(key, true);
222 }
223 else if (event == JGroupEventMessage.REMOVE_ALL) {
224 cache.removeAll(true);
225 }
226 else if (event == JGroupEventMessage.PUT) {
227 Element element = jGroupEventMessage.getElement();
228
229 cache.put(new Element(key, element.getValue()), true);
230 }
231 }
232
233 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
234
235 private static final String _SCHEME = "JGroups";
236
237 private CacheManager _cacheManager;
238 private JChannel _jChannel;
239
240 private class EhcacheJGroupsReceiver extends BaseReceiver {
241
242 @Override
243 public void receive(Message message) {
244 Object object = message.getObject();
245
246 if (object == null) {
247 if (_log.isWarnEnabled()) {
248 _log.warn("Message content is null");
249 }
250
251 return;
252 }
253
254 if (object instanceof Serializable) {
255 handleNotification((Serializable)object);
256 }
257 else {
258 if (_log.isWarnEnabled()) {
259 _log.warn(
260 "Unable to process message content of type " +
261 object.getClass().getName());
262 }
263 }
264 }
265
266 }
267
268 }