001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.cluster.BaseReceiver;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.Serializable;
022    
023    import java.rmi.RemoteException;
024    
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import net.sf.ehcache.Cache;
029    import net.sf.ehcache.CacheException;
030    import net.sf.ehcache.CacheManager;
031    import net.sf.ehcache.Ehcache;
032    import net.sf.ehcache.Element;
033    import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034    import net.sf.ehcache.distribution.CachePeer;
035    import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036    
037    import org.jgroups.Address;
038    import org.jgroups.JChannel;
039    import org.jgroups.Message;
040    import org.jgroups.View;
041    
042    /**
043     * <p>
044     * See http://issues.liferay.com/browse/LPS-11061.
045     * </p>
046     *
047     * @author Tina Tian
048     */
049    public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050    
051            public JGroupsManager(
052                    CacheManager cacheManager, String clusterName,
053                    String channelProperties) {
054    
055                    try {
056                            _jChannel = new JChannel(channelProperties);
057    
058                            _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059    
060                            _jChannel.connect(clusterName);
061    
062                            if (_log.isInfoEnabled()) {
063                                    _log.info(
064                                            "Create a new channel with properties " +
065                                                    _jChannel.getProperties());
066                            }
067                    }
068                    catch (Exception e) {
069                            if (_log.isErrorEnabled()) {
070                                    _log.error("Unable to initialize channels", e);
071                            }
072                    }
073    
074                    _cacheManager = cacheManager;
075            }
076    
077            public void dispose() throws CacheException {
078                    if (_jChannel != null) {
079                            _jChannel.close();
080                    }
081            }
082    
083            public Address getBusLocalAddress() {
084                    return _jChannel.getAddress();
085            }
086    
087            public List<Address> getBusMembership() {
088                    BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
089    
090                    View view = baseReceiver.getView();
091    
092                    return view.getMembers();
093            }
094    
095            @SuppressWarnings("rawtypes")
096            public List getElements(List list) {
097                    return null;
098            }
099    
100            public String getGuid() {
101                    return null;
102            }
103    
104            @SuppressWarnings("rawtypes")
105            public List getKeys() {
106                    return null;
107            }
108    
109            public String getName() {
110                    return null;
111            }
112    
113            public Element getQuiet(Serializable serializable) {
114                    return null;
115            }
116    
117            public String getScheme() {
118                    return _SCHEME;
119            }
120    
121            public long getTimeForClusterToForm() {
122                    return 0;
123            }
124    
125            public String getUrl() {
126                    return null;
127            }
128    
129            public String getUrlBase() {
130                    return null;
131            }
132    
133            public void handleNotification(Serializable serializable) {
134                    if (serializable instanceof JGroupEventMessage) {
135                            handleJGroupsNotification((JGroupEventMessage)serializable);
136                    }
137                    else if (serializable instanceof List<?>) {
138                            List<?> valueList = (List<?>)serializable;
139    
140                            for (Object object : valueList) {
141                                    if (object instanceof JGroupEventMessage) {
142                                            handleJGroupsNotification((JGroupEventMessage)object);
143                                    }
144                            }
145                    }
146            }
147    
148            public void init() {
149            }
150    
151            public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
152                    List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
153    
154                    cachePeers.add(this);
155    
156                    return cachePeers;
157            }
158    
159            public void put(Element element) {
160            }
161    
162            public void registerPeer(String string) {
163            }
164    
165            public boolean remove(Serializable serializable) {
166                    return false;
167            }
168    
169            public void removeAll() {
170            }
171    
172            @SuppressWarnings("rawtypes")
173            public void send(Address address, List eventMessages)
174                    throws RemoteException {
175    
176                    ArrayList<JGroupEventMessage> jGroupEventMessages =
177                            new ArrayList<JGroupEventMessage>();
178    
179                    for (Object eventMessage : eventMessages) {
180                            if (eventMessage instanceof JGroupEventMessage) {
181                                    JGroupEventMessage jGroupEventMessage =
182                                            (JGroupEventMessage)eventMessage;
183    
184                                    jGroupEventMessages.add(jGroupEventMessage);
185                            }
186                            else {
187                                    if (_log.isDebugEnabled()) {
188                                            _log.debug(
189                                                    eventMessage + "is not a JGroupEventMessage type");
190                                    }
191                            }
192                    }
193    
194                    try {
195                            _jChannel.send(address, jGroupEventMessages);
196                    }
197                    catch (Throwable t) {
198                            throw new RemoteException(t.getMessage());
199                    }
200            }
201    
202            @SuppressWarnings("rawtypes")
203            public void send(List eventMessages) throws RemoteException {
204                    send(null, eventMessages);
205            }
206    
207            public void unregisterPeer(String string) {
208            }
209    
210            protected void handleJGroupsNotification(
211                    JGroupEventMessage jGroupEventMessage) {
212    
213                    Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
214    
215                    if (cache == null) {
216                            return;
217                    }
218    
219                    int event = jGroupEventMessage.getEvent();
220                    Serializable key = jGroupEventMessage.getSerializableKey();
221    
222                    if ((event == JGroupEventMessage.REMOVE) &&
223                            (cache.getQuiet(key) != null)) {
224    
225                            cache.remove(key, true);
226                    }
227                    else if (event == JGroupEventMessage.REMOVE_ALL) {
228                            cache.removeAll(true);
229                    }
230                    else if (event == JGroupEventMessage.PUT) {
231                            Element element = jGroupEventMessage.getElement();
232    
233                            cache.put(new Element(key, element.getValue()), true);
234                    }
235            }
236    
237            private static final String _SCHEME = "JGroups";
238    
239            private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
240    
241            private CacheManager _cacheManager;
242            private JChannel _jChannel;
243    
244            private class EhcacheJGroupsReceiver extends BaseReceiver {
245    
246                    @Override
247                    public void receive(Message message) {
248                            Object object = message.getObject();
249    
250                            if (object == null) {
251                                    if (_log.isWarnEnabled()) {
252                                            _log.warn("Message content is null");
253                                    }
254    
255                                    return;
256                            }
257    
258                            if (object instanceof Serializable) {
259                                    handleNotification((Serializable)object);
260                            }
261                            else {
262                                    if (_log.isWarnEnabled()) {
263                                            _log.warn(
264                                                    "Unable to process message content of type " +
265                                                            object.getClass().getName());
266                                    }
267                            }
268                    }
269    
270            }
271    
272    }