001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.cluster.BaseReceiver;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.Serializable;
022    
023    import java.rmi.RemoteException;
024    
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import net.sf.ehcache.Cache;
029    import net.sf.ehcache.CacheException;
030    import net.sf.ehcache.CacheManager;
031    import net.sf.ehcache.Ehcache;
032    import net.sf.ehcache.Element;
033    import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034    import net.sf.ehcache.distribution.CachePeer;
035    import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036    
037    import org.jgroups.Address;
038    import org.jgroups.JChannel;
039    import org.jgroups.Message;
040    import org.jgroups.View;
041    
042    /**
043     * <p>
044     * See http://issues.liferay.com/browse/LPS-11061.
045     * </p>
046     *
047     * @author Tina Tian
048     */
049    public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050    
051            public JGroupsManager(
052                    CacheManager cacheManager, String clusterName,
053                    String channelProperties) {
054    
055                    try {
056                            _jChannel = new JChannel(channelProperties);
057    
058                            _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059    
060                            _jChannel.connect(clusterName);
061    
062                            if (_log.isInfoEnabled()) {
063                                    _log.info(
064                                            "Create a new channel with properties " +
065                                                    _jChannel.getProperties());
066                            }
067                    }
068                    catch (Exception e) {
069                            if (_log.isErrorEnabled()) {
070                                    _log.error("Unable to initialize channels", e);
071                            }
072                    }
073    
074                    _cacheManager = cacheManager;
075            }
076    
077            @Override
078            public void dispose() throws CacheException {
079                    if (_jChannel != null) {
080                            _jChannel.close();
081                    }
082            }
083    
084            public Address getBusLocalAddress() {
085                    return _jChannel.getAddress();
086            }
087    
088            public List<Address> getBusMembership() {
089                    BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
090    
091                    View view = baseReceiver.getView();
092    
093                    return view.getMembers();
094            }
095    
096            @Override
097            @SuppressWarnings("rawtypes")
098            public List getElements(List list) {
099                    return null;
100            }
101    
102            @Override
103            public String getGuid() {
104                    return null;
105            }
106    
107            @Override
108            @SuppressWarnings("rawtypes")
109            public List getKeys() {
110                    return null;
111            }
112    
113            @Override
114            public String getName() {
115                    return null;
116            }
117    
118            @Override
119            public Element getQuiet(Serializable serializable) {
120                    return null;
121            }
122    
123            @Override
124            public String getScheme() {
125                    return _SCHEME;
126            }
127    
128            @Override
129            public long getTimeForClusterToForm() {
130                    return 0;
131            }
132    
133            @Override
134            public String getUrl() {
135                    return null;
136            }
137    
138            @Override
139            public String getUrlBase() {
140                    return null;
141            }
142    
143            public void handleNotification(Serializable serializable) {
144                    if (serializable instanceof JGroupEventMessage) {
145                            handleJGroupsNotification((JGroupEventMessage)serializable);
146                    }
147                    else if (serializable instanceof List<?>) {
148                            List<?> valueList = (List<?>)serializable;
149    
150                            for (Object object : valueList) {
151                                    if (object instanceof JGroupEventMessage) {
152                                            handleJGroupsNotification((JGroupEventMessage)object);
153                                    }
154                            }
155                    }
156            }
157    
158            @Override
159            public void init() {
160            }
161    
162            @Override
163            public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
164                    List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
165    
166                    cachePeers.add(this);
167    
168                    return cachePeers;
169            }
170    
171            @Override
172            public void put(Element element) {
173            }
174    
175            @Override
176            public void registerPeer(String string) {
177            }
178    
179            @Override
180            public boolean remove(Serializable serializable) {
181                    return false;
182            }
183    
184            @Override
185            public void removeAll() {
186            }
187    
188            @SuppressWarnings("rawtypes")
189            public void send(Address address, List eventMessages)
190                    throws RemoteException {
191    
192                    ArrayList<JGroupEventMessage> jGroupEventMessages =
193                            new ArrayList<JGroupEventMessage>();
194    
195                    for (Object eventMessage : eventMessages) {
196                            if (eventMessage instanceof JGroupEventMessage) {
197                                    JGroupEventMessage jGroupEventMessage =
198                                            (JGroupEventMessage)eventMessage;
199    
200                                    jGroupEventMessages.add(jGroupEventMessage);
201                            }
202                            else {
203                                    if (_log.isDebugEnabled()) {
204                                            _log.debug(
205                                                    eventMessage + "is not a JGroupEventMessage type");
206                                    }
207                            }
208                    }
209    
210                    try {
211                            _jChannel.send(address, null, jGroupEventMessages);
212                    }
213                    catch (Throwable t) {
214                            throw new RemoteException(t.getMessage());
215                    }
216            }
217    
218            @Override
219            @SuppressWarnings("rawtypes")
220            public void send(List eventMessages) throws RemoteException {
221                    send(null, eventMessages);
222            }
223    
224            @Override
225            public void unregisterPeer(String string) {
226            }
227    
228            protected void handleJGroupsNotification(
229                    JGroupEventMessage jGroupEventMessage) {
230    
231                    Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
232    
233                    if (cache == null) {
234                            return;
235                    }
236    
237                    int event = jGroupEventMessage.getEvent();
238                    Serializable key = jGroupEventMessage.getSerializableKey();
239    
240                    if ((event == JGroupEventMessage.REMOVE) &&
241                            (cache.getQuiet(key) != null)) {
242    
243                            cache.remove(key, true);
244                    }
245                    else if (event == JGroupEventMessage.REMOVE_ALL) {
246                            cache.removeAll(true);
247                    }
248                    else if (event == JGroupEventMessage.PUT) {
249                            Element element = jGroupEventMessage.getElement();
250    
251                            cache.put(new Element(key, element.getValue()), true);
252                    }
253            }
254    
255            private static final String _SCHEME = "JGroups";
256    
257            private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
258    
259            private CacheManager _cacheManager;
260            private JChannel _jChannel;
261    
262            private class EhcacheJGroupsReceiver extends BaseReceiver {
263    
264                    @Override
265                    public void receive(Message message) {
266                            Object object = message.getObject();
267    
268                            if (object == null) {
269                                    if (_log.isWarnEnabled()) {
270                                            _log.warn("Message content is null");
271                                    }
272    
273                                    return;
274                            }
275    
276                            if (object instanceof Serializable) {
277                                    handleNotification((Serializable)object);
278                            }
279                            else {
280                                    if (_log.isWarnEnabled()) {
281                                            _log.warn(
282                                                    "Unable to process message content of type " +
283                                                            object.getClass().getName());
284                                    }
285                            }
286                    }
287    
288            }
289    
290    }