001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.cluster.BaseReceiver;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.Serializable;
022    
023    import java.rmi.RemoteException;
024    
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import net.sf.ehcache.Cache;
029    import net.sf.ehcache.CacheException;
030    import net.sf.ehcache.CacheManager;
031    import net.sf.ehcache.Ehcache;
032    import net.sf.ehcache.Element;
033    import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034    import net.sf.ehcache.distribution.CachePeer;
035    import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036    
037    import org.jgroups.Address;
038    import org.jgroups.JChannel;
039    import org.jgroups.Message;
040    import org.jgroups.View;
041    
042    /**
043     * <p>
044     * See https://issues.liferay.com/browse/LPS-11061.
045     * </p>
046     *
047     * @author Tina Tian
048     */
049    public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050    
051            public JGroupsManager(
052                    CacheManager cacheManager, String clusterName,
053                    String channelProperties) {
054    
055                    _cacheManager = cacheManager;
056    
057                    JChannel jChannel = null;
058    
059                    try {
060                            jChannel = new JChannel(channelProperties);
061    
062                            jChannel.setReceiver(new EhcacheJGroupsReceiver());
063    
064                            jChannel.connect(clusterName);
065    
066                            if (_log.isInfoEnabled()) {
067                                    _log.info(
068                                            "Create a new channel with properties " +
069                                                    jChannel.getProperties());
070                            }
071                    }
072                    catch (Exception e) {
073                            if (_log.isErrorEnabled()) {
074                                    _log.error("Unable to initialize channels", e);
075                            }
076                    }
077    
078                    _jChannel = jChannel;
079            }
080    
081            @Override
082            public void dispose() throws CacheException {
083                    if (_jChannel != null) {
084                            _jChannel.close();
085                    }
086            }
087    
088            public Address getBusLocalAddress() {
089                    return _jChannel.getAddress();
090            }
091    
092            public List<Address> getBusMembership() {
093                    BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
094    
095                    View view = baseReceiver.getView();
096    
097                    return view.getMembers();
098            }
099    
100            @Override
101            @SuppressWarnings("rawtypes")
102            public List getElements(List list) {
103                    return null;
104            }
105    
106            @Override
107            public String getGuid() {
108                    return null;
109            }
110    
111            @Override
112            @SuppressWarnings("rawtypes")
113            public List getKeys() {
114                    return null;
115            }
116    
117            @Override
118            public String getName() {
119                    return null;
120            }
121    
122            @Override
123            public Element getQuiet(Serializable serializable) {
124                    return null;
125            }
126    
127            @Override
128            public String getScheme() {
129                    return _SCHEME;
130            }
131    
132            @Override
133            public long getTimeForClusterToForm() {
134                    return 0;
135            }
136    
137            @Override
138            public String getUrl() {
139                    return null;
140            }
141    
142            @Override
143            public String getUrlBase() {
144                    return null;
145            }
146    
147            public void handleNotification(Serializable serializable) {
148                    if (serializable instanceof JGroupEventMessage) {
149                            handleJGroupsNotification((JGroupEventMessage)serializable);
150                    }
151                    else if (serializable instanceof List<?>) {
152                            List<?> valueList = (List<?>)serializable;
153    
154                            for (Object object : valueList) {
155                                    if (object instanceof JGroupEventMessage) {
156                                            handleJGroupsNotification((JGroupEventMessage)object);
157                                    }
158                            }
159                    }
160            }
161    
162            @Override
163            public void init() {
164            }
165    
166            @Override
167            public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
168                    List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
169    
170                    cachePeers.add(this);
171    
172                    return cachePeers;
173            }
174    
175            @Override
176            public void put(Element element) {
177            }
178    
179            @Override
180            public void registerPeer(String string) {
181            }
182    
183            @Override
184            public boolean remove(Serializable serializable) {
185                    return false;
186            }
187    
188            @Override
189            public void removeAll() {
190            }
191    
192            @SuppressWarnings("rawtypes")
193            public void send(Address address, List eventMessages)
194                    throws RemoteException {
195    
196                    ArrayList<JGroupEventMessage> jGroupEventMessages =
197                            new ArrayList<JGroupEventMessage>();
198    
199                    for (Object eventMessage : eventMessages) {
200                            if (eventMessage instanceof JGroupEventMessage) {
201                                    JGroupEventMessage jGroupEventMessage =
202                                            (JGroupEventMessage)eventMessage;
203    
204                                    jGroupEventMessages.add(jGroupEventMessage);
205                            }
206                            else {
207                                    if (_log.isDebugEnabled()) {
208                                            _log.debug(
209                                                    eventMessage + "is not a JGroupEventMessage type");
210                                    }
211                            }
212                    }
213    
214                    try {
215                            _jChannel.send(address, jGroupEventMessages);
216                    }
217                    catch (Throwable t) {
218                            throw new RemoteException(t.getMessage());
219                    }
220            }
221    
222            @Override
223            @SuppressWarnings("rawtypes")
224            public void send(List eventMessages) throws RemoteException {
225                    send(null, eventMessages);
226            }
227    
228            @Override
229            public void unregisterPeer(String string) {
230            }
231    
232            protected void handleJGroupsNotification(
233                    JGroupEventMessage jGroupEventMessage) {
234    
235                    Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
236    
237                    if (cache == null) {
238                            return;
239                    }
240    
241                    int event = jGroupEventMessage.getEvent();
242    
243                    if (event == JGroupEventMessage.REMOVE_ALL) {
244                            cache.removeAll(true);
245    
246                            return;
247                    }
248    
249                    Serializable key = jGroupEventMessage.getSerializableKey();
250    
251                    if (key == null) {
252                            throw new NullPointerException("Key is null");
253                    }
254    
255                    if ((event == JGroupEventMessage.REMOVE) &&
256                            (cache.getQuiet(key) != null)) {
257    
258                            cache.remove(key, true);
259                    }
260                    else if (event == JGroupEventMessage.PUT) {
261                            Element element = jGroupEventMessage.getElement();
262    
263                            Object value = element.getObjectValue();
264    
265                            if (value == null) {
266                                    cache.remove(key, true);
267                            }
268                            else {
269                                    cache.put(new Element(key, value), true);
270                            }
271                    }
272            }
273    
274            private static final String _SCHEME = "JGroups";
275    
276            private static final Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
277    
278            private final CacheManager _cacheManager;
279            private final JChannel _jChannel;
280    
281            private class EhcacheJGroupsReceiver extends BaseReceiver {
282    
283                    @Override
284                    public void receive(Message message) {
285                            Object object = message.getObject();
286    
287                            if (object == null) {
288                                    if (_log.isWarnEnabled()) {
289                                            _log.warn("Message content is null");
290                                    }
291    
292                                    return;
293                            }
294    
295                            if (object instanceof Serializable) {
296                                    handleNotification((Serializable)object);
297                            }
298                            else {
299                                    if (_log.isWarnEnabled()) {
300                                            _log.warn(
301                                                    "Unable to process message content of type " +
302                                                            object.getClass().getName());
303                                    }
304                            }
305                    }
306    
307            }
308    
309    }