001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018    import com.liferay.portal.kernel.cluster.Address;
019    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022    import com.liferay.portal.kernel.cluster.ClusterRequest;
023    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024    import com.liferay.portal.kernel.exception.SystemException;
025    import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026    import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027    import com.liferay.portal.kernel.log.Log;
028    import com.liferay.portal.kernel.log.LogFactoryUtil;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.MethodKey;
031    import com.liferay.portal.kernel.util.SocketUtil;
032    import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.io.IOException;
036    import java.io.ObjectInputStream;
037    import java.io.ObjectOutputStream;
038    import java.io.Serializable;
039    
040    import java.net.ServerSocket;
041    import java.net.Socket;
042    import java.net.SocketAddress;
043    import java.net.SocketException;
044    import java.net.SocketTimeoutException;
045    
046    import java.nio.channels.ServerSocketChannel;
047    
048    import java.util.ArrayList;
049    import java.util.Arrays;
050    import java.util.List;
051    import java.util.concurrent.BlockingQueue;
052    import java.util.concurrent.TimeUnit;
053    
054    import net.sf.ehcache.CacheManager;
055    import net.sf.ehcache.Ehcache;
056    import net.sf.ehcache.Element;
057    
058    /**
059     * @author Shuyang Zhou
060     * @author Sherry Yang
061     */
062    public class EhcacheStreamBootstrapHelpUtil {
063    
064            public static SocketAddress createServerSocketFromCluster(
065                            List<String> allCacheNames, List<String> newCacheNames)
066                    throws Exception {
067    
068                    ServerSocketChannel serverSocketChannel =
069                            SocketUtil.createServerSocketChannel(
070                                    ClusterLinkUtil.getBindInetAddress(),
071                                    PropsValues.EHCACHE_SOCKET_START_PORT,
072                                    _serverSocketConfigurator);
073    
074                    ServerSocket serverSocket = serverSocketChannel.socket();
075    
076                    EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
077                            (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
078                                    _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
079    
080                    CacheManager cacheManager =
081                            ehcachePortalCacheManager.getEhcacheManager();
082    
083                    if (allCacheNames != null) {
084                            newCacheNames.addAll(Arrays.asList(cacheManager.getCacheNames()));
085    
086                            newCacheNames.removeAll(allCacheNames);
087                    }
088    
089                    EhcacheStreamServerThread ehcacheStreamServerThread =
090                            new EhcacheStreamServerThread(
091                                    serverSocket, cacheManager, newCacheNames);
092    
093                    ehcacheStreamServerThread.start();
094    
095                    return serverSocket.getLocalSocketAddress();
096            }
097    
098            protected static void loadCachesFromCluster(
099                            boolean synchronizeCaches, Ehcache... newEhcaches)
100                    throws Exception {
101    
102                    List<Address> clusterNodeAddresses =
103                            ClusterExecutorUtil.getClusterNodeAddresses();
104    
105                    if (_log.isInfoEnabled()) {
106                            _log.info("Cluster node addresses " + clusterNodeAddresses);
107                    }
108    
109                    if (clusterNodeAddresses.size() <= 1) {
110                            if (_log.isDebugEnabled()) {
111                                    _log.debug(
112                                            "Not loading cache from cluster because a cluster peer " +
113                                                    "was not found");
114                            }
115    
116                            return;
117                    }
118    
119                    EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
120                            (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
121                                    _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
122    
123                    CacheManager cacheManager =
124                            ehcachePortalCacheManager.getEhcacheManager();
125    
126                    List<String> allCacheNames = null;
127    
128                    if (synchronizeCaches) {
129                            allCacheNames = Arrays.asList(cacheManager.getCacheNames());
130                    }
131    
132                    List<String> newCacheNames = new ArrayList<String>();
133    
134                    for (Ehcache ehcache : newEhcaches) {
135                            newCacheNames.add(ehcache.getName());
136                    }
137    
138                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
139                            new MethodHandler(
140                                    _createServerSocketFromClusterMethodKey, allCacheNames,
141                                    newCacheNames),
142                            true);
143    
144                    FutureClusterResponses futureClusterResponses =
145                            ClusterExecutorUtil.execute(clusterRequest);
146    
147                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
148                            futureClusterResponses.getPartialResults();
149    
150                    ClusterNodeResponse clusterNodeResponse = null;
151    
152                    try {
153                            clusterNodeResponse = clusterNodeResponses.poll(
154                                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
155                                    TimeUnit.MILLISECONDS);
156                    }
157                    catch (InterruptedException ie) {
158                            return;
159                    }
160    
161                    if (clusterNodeResponse == null) {
162                            if (_log.isWarnEnabled()) {
163                                    _log.warn(
164                                            "Unable to load cache from the cluster because there " +
165                                                    "was no peer response");
166                            }
167    
168                            return;
169                    }
170    
171                    Socket socket = null;
172                    ObjectInputStream objectInputStream = null;
173    
174                    try {
175                            SocketAddress remoteSocketAddress =
176                                    (SocketAddress)clusterNodeResponse.getResult();
177    
178                            if (remoteSocketAddress == null) {
179                                    _log.error(
180                                            "Cluster peer " + clusterNodeResponse.getClusterNode() +
181                                                    " responded with a null socket address");
182    
183                                    return;
184                            }
185    
186                            socket = new Socket();
187    
188                            socket.connect(remoteSocketAddress);
189    
190                            socket.shutdownOutput();
191    
192                            objectInputStream = new AnnotatedObjectInputStream(
193                                    socket.getInputStream());
194    
195                            Ehcache ehcache = null;
196    
197                            while (true) {
198                                    Object object = objectInputStream.readObject();
199    
200                                    if (object instanceof EhcacheElement) {
201                                            EhcacheElement ehcacheElement = (EhcacheElement)object;
202    
203                                            Element element = ehcacheElement.toElement();
204    
205                                            ehcache.put(element, true);
206                                    }
207                                    else if (object instanceof String) {
208                                            if (_COMMAND_SOCKET_CLOSE.equals(object)) {
209                                                    break;
210                                            }
211    
212                                            EhcacheStreamBootstrapCacheLoader.setSkip();
213    
214                                            try {
215                                                    ehcache = cacheManager.addCacheIfAbsent((String)object);
216                                            }
217                                            finally {
218                                                    EhcacheStreamBootstrapCacheLoader.resetSkip();
219                                            }
220                                    }
221                                    else {
222                                            throw new SystemException(
223                                                    "Socket input stream returned invalid object " +
224                                                            object);
225                                    }
226                            }
227                    }
228                    finally {
229                            if (objectInputStream != null) {
230                                    objectInputStream.close();
231                            }
232    
233                            if (socket != null) {
234                                    socket.close();
235                            }
236                    }
237            }
238    
239            private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
240                    "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
241    
242            private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
243    
244            private static Log _log = LogFactoryUtil.getLog(
245                    EhcacheStreamBootstrapHelpUtil.class);
246    
247            private static MethodKey _createServerSocketFromClusterMethodKey =
248                    new MethodKey(
249                            EhcacheStreamBootstrapHelpUtil.class,
250                            "createServerSocketFromCluster", List.class, List.class);
251            private static ServerSocketConfigurator _serverSocketConfigurator =
252                    new SocketCacheServerSocketConfiguration();
253    
254            private static class EhcacheElement implements Serializable {
255    
256                    public EhcacheElement(Serializable key, Serializable value) {
257                            _key = key;
258                            _value = value;
259                    }
260    
261                    public Element toElement() {
262                            return new Element(_key, _value);
263                    }
264    
265                    private Serializable _key;
266                    private Serializable _value;
267    
268            }
269    
270            private static class EhcacheStreamServerThread extends Thread {
271    
272                    public EhcacheStreamServerThread(
273                            ServerSocket serverSocket, CacheManager cacheManager,
274                            List<String> cacheNames) {
275    
276                            _serverSocket = serverSocket;
277                            _cacheManager = cacheManager;
278                            _cacheNames = cacheNames;
279    
280                            setDaemon(true);
281                            setName(
282                                    EhcacheStreamServerThread.class.getName() + " - " + cacheNames);
283                            setPriority(Thread.NORM_PRIORITY);
284                    }
285    
286                    @Override
287                    public void run() {
288                            Socket socket = null;
289    
290                            try {
291                                    try {
292                                            socket = _serverSocket.accept();
293                                    }
294                                    catch (SocketTimeoutException ste) {
295                                            if (_log.isDebugEnabled()) {
296                                                    _log.debug(
297                                                            "Terminating the socket thread " + getName() +
298                                                                    " that the client requested but never used");
299                                            }
300    
301                                            return;
302                                    }
303    
304                                    _serverSocket.close();
305    
306                                    socket.shutdownInput();
307    
308                                    ObjectOutputStream objectOutputStream =
309                                            new AnnotatedObjectOutputStream(socket.getOutputStream());
310    
311                                    for (String cacheName : _cacheNames) {
312                                            Ehcache ehcache = _cacheManager.getCache(cacheName);
313    
314                                            if (ehcache == null) {
315                                                    EhcacheStreamBootstrapCacheLoader.setSkip();
316    
317                                                    try {
318                                                            _cacheManager.addCache(cacheName);
319                                                    }
320                                                    finally {
321                                                            EhcacheStreamBootstrapCacheLoader.resetSkip();
322                                                    }
323    
324                                                    continue;
325                                            }
326    
327                                            objectOutputStream.writeObject(cacheName);
328    
329                                            List<Object> keys = ehcache.getKeys();
330    
331                                            for (Object key : keys) {
332                                                    if (!(key instanceof Serializable)) {
333                                                            if (_log.isWarnEnabled()) {
334                                                                    _log.warn(
335                                                                            "Key " + key + " is not serializable");
336                                                            }
337    
338                                                            continue;
339                                                    }
340    
341                                                    Element element = ehcache.get(key);
342    
343                                                    if (element == null) {
344                                                            continue;
345                                                    }
346    
347                                                    Object value = element.getObjectValue();
348    
349                                                    if (!(value instanceof Serializable)) {
350                                                            if (_log.isWarnEnabled() && (value != null)) {
351                                                                    _log.warn(
352                                                                            "Value " + value + " is not serializable");
353                                                            }
354    
355                                                            continue;
356                                                    }
357    
358                                                    EhcacheElement ehcacheElement = new EhcacheElement(
359                                                            (Serializable)key, (Serializable)value);
360    
361                                                    objectOutputStream.writeObject(ehcacheElement);
362                                            }
363                                    }
364    
365                                    objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
366    
367                                    objectOutputStream.close();
368                            }
369                            catch (Exception e) {
370                                    throw new RuntimeException(e);
371                            }
372                            finally {
373                                    if (socket != null) {
374                                            try {
375                                                    socket.close();
376                                            }
377                                            catch (IOException ioe) {
378                                                    throw new RuntimeException(ioe);
379                                            }
380                                    }
381                            }
382                    }
383    
384                    private CacheManager _cacheManager;
385                    private List<String> _cacheNames;
386                    private ServerSocket _serverSocket;
387    
388            }
389    
390            private static class SocketCacheServerSocketConfiguration
391                    implements ServerSocketConfigurator {
392    
393                    @Override
394                    public void configure(ServerSocket serverSocket)
395                            throws SocketException {
396    
397                            serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
398                    }
399    
400            }
401    
402    }