001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.bootstrap;
016    
017    import com.liferay.portal.kernel.cache.PortalCache;
018    import com.liferay.portal.kernel.cache.PortalCacheManager;
019    import com.liferay.portal.kernel.cache.PortalCacheProvider;
020    import com.liferay.portal.kernel.cluster.Address;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.exception.SystemException;
027    import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
028    import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    import com.liferay.portal.kernel.util.InitialThreadLocal;
032    import com.liferay.portal.kernel.util.MethodHandler;
033    import com.liferay.portal.kernel.util.MethodKey;
034    import com.liferay.portal.kernel.util.SocketUtil;
035    import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
036    import com.liferay.portal.util.PropsValues;
037    
038    import java.io.IOException;
039    import java.io.ObjectInputStream;
040    import java.io.ObjectOutputStream;
041    import java.io.OutputStream;
042    import java.io.Serializable;
043    
044    import java.net.ServerSocket;
045    import java.net.Socket;
046    import java.net.SocketAddress;
047    import java.net.SocketException;
048    import java.net.SocketTimeoutException;
049    
050    import java.nio.channels.ServerSocketChannel;
051    
052    import java.util.ArrayList;
053    import java.util.Arrays;
054    import java.util.HashMap;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.concurrent.BlockingQueue;
058    import java.util.concurrent.TimeUnit;
059    
060    /**
061     * @author Shuyang Zhou
062     * @author Sherry Yang
063     */
064    public class ClusterLinkBootstrapLoaderHelperUtil {
065    
066            public static SocketAddress createServerSocketFromCluster(
067                            String portalCacheManagerName, List<String> portalCacheNames)
068                    throws Exception {
069    
070                    ServerSocketChannel serverSocketChannel =
071                            SocketUtil.createServerSocketChannel(
072                                    ClusterLinkUtil.getBindInetAddress(),
073                                    PropsValues.EHCACHE_SOCKET_START_PORT,
074                                    _serverSocketConfigurator);
075    
076                    ServerSocket serverSocket = serverSocketChannel.socket();
077    
078                    ClusterLinkBootstrapLoaderServerThread
079                            clusterLinkBootstrapLoaderServerThread =
080                                    new ClusterLinkBootstrapLoaderServerThread(
081                                            serverSocket, portalCacheManagerName, portalCacheNames);
082    
083                    clusterLinkBootstrapLoaderServerThread.start();
084    
085                    return serverSocket.getLocalSocketAddress();
086            }
087    
088            public static synchronized void start() {
089                    if (!_started) {
090                            _started = true;
091                    }
092    
093                    if (_deferredPortalCaches.isEmpty()) {
094                            return;
095                    }
096    
097                    if (_log.isDebugEnabled()) {
098                            _log.debug("Loading deferred caches");
099                    }
100    
101                    try {
102                            for (Map.Entry<String, List<String>> entry :
103                                            _deferredPortalCaches.entrySet()) {
104    
105                                    List<String> portalCacheNames = entry.getValue();
106    
107                                    if (portalCacheNames.isEmpty()) {
108                                            continue;
109                                    }
110    
111                                    loadCachesFromCluster(
112                                            entry.getKey(),
113                                            portalCacheNames.toArray(
114                                                    new String[portalCacheNames.size()]));
115                            }
116                    }
117                    catch (Exception e) {
118                            if (_log.isWarnEnabled()) {
119                                    _log.warn("Unable to load cache data from the cluster", e);
120                            }
121                    }
122                    finally {
123                            _deferredPortalCaches.clear();
124                    }
125            }
126    
127            protected static boolean isSkipped() {
128                    return _skipBootstrapLoaderThreadLocal.get();
129            }
130    
131            protected static void loadCachesFromCluster(
132                            String portalCacheManagerName, String ... portalCacheNames)
133                    throws Exception {
134    
135                    synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
136                            if (!_started) {
137                                    List<String> portalCaches = _deferredPortalCaches.get(
138                                            portalCacheManagerName);
139    
140                                    if (portalCaches == null) {
141                                            portalCaches = new ArrayList<String>();
142    
143                                            _deferredPortalCaches.put(
144                                                    portalCacheManagerName, portalCaches);
145                                    }
146    
147                                    portalCaches.addAll(Arrays.asList(portalCacheNames));
148    
149                                    return;
150                            }
151                    }
152    
153                    List<Address> clusterNodeAddresses =
154                            ClusterExecutorUtil.getClusterNodeAddresses();
155    
156                    if (_log.isInfoEnabled()) {
157                            _log.info("Cluster node addresses " + clusterNodeAddresses);
158                    }
159    
160                    if (clusterNodeAddresses.size() <= 1) {
161                            if (_log.isDebugEnabled()) {
162                                    _log.debug(
163                                            "Not loading cache from cluster because a cluster peer " +
164                                                    "was not found");
165                            }
166    
167                            return;
168                    }
169    
170                    PortalCacheManager<? extends Serializable, ?> portalCacheManager =
171                            PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
172    
173                    if (!portalCacheManager.isClusterAware()) {
174                            return;
175                    }
176    
177                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
178                            new MethodHandler(
179                                    _createServerSocketFromClusterMethodKey, portalCacheManagerName,
180                                    Arrays.asList(portalCacheNames)),
181                            true);
182    
183                    FutureClusterResponses futureClusterResponses =
184                            ClusterExecutorUtil.execute(clusterRequest);
185    
186                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
187                            futureClusterResponses.getPartialResults();
188    
189                    ClusterNodeResponse clusterNodeResponse = null;
190    
191                    try {
192                            clusterNodeResponse = clusterNodeResponses.poll(
193                                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
194                                    TimeUnit.MILLISECONDS);
195                    }
196                    catch (InterruptedException ie) {
197                            return;
198                    }
199    
200                    if (clusterNodeResponse == null) {
201                            if (_log.isWarnEnabled()) {
202                                    _log.warn(
203                                            "Unable to load cache from the cluster because there " +
204                                                    "was no peer response");
205                            }
206    
207                            return;
208                    }
209    
210                    if (_log.isInfoEnabled()) {
211                            _log.info(
212                                    "Load cache data from cluster node " +
213                                            clusterNodeResponse.getClusterNode());
214                    }
215    
216                    Socket socket = null;
217    
218                    try {
219                            SocketAddress remoteSocketAddress =
220                                    (SocketAddress)clusterNodeResponse.getResult();
221    
222                            if (remoteSocketAddress == null) {
223                                    _log.error(
224                                            "Cluster peer " + clusterNodeResponse.getClusterNode() +
225                                                    " responded with a null socket address");
226    
227                                    return;
228                            }
229    
230                            socket = new Socket();
231    
232                            socket.connect(remoteSocketAddress);
233    
234                            socket.shutdownOutput();
235    
236                            ObjectInputStream objectInputStream =
237                                    new AnnotatedObjectInputStream(socket.getInputStream());
238    
239                            PortalCache<Serializable, Serializable> portalCache = null;
240    
241                            try {
242                                    while (true) {
243                                            Object object = objectInputStream.readObject();
244    
245                                            if (object instanceof CacheElement) {
246                                                    CacheElement cacheElement = (CacheElement)object;
247    
248                                                    portalCache.putQuiet(
249                                                            cacheElement.getKey(), cacheElement.getValue());
250                                            }
251                                            else if (object instanceof String) {
252                                                    if (_COMMAND_SOCKET_CLOSE.equals(object)) {
253                                                            break;
254                                                    }
255    
256                                                    _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
257    
258                                                    try {
259                                                            portalCache =
260                                                                    (PortalCache<Serializable, Serializable>)
261                                                                            portalCacheManager.getCache((String)object);
262                                                    }
263                                                    finally {
264                                                            _skipBootstrapLoaderThreadLocal.remove();
265                                                    }
266                                            }
267                                            else {
268                                                    throw new SystemException(
269                                                            "Socket input stream returned invalid object " +
270                                                                    object);
271                                            }
272                                    }
273                            }
274                            finally {
275                                    if (objectInputStream != null) {
276                                            objectInputStream.close();
277                                    }
278                            }
279                    }
280                    catch (Exception e) {
281                            throw new Exception(
282                                    "Unable to load cache data from cluster node " +
283                                            clusterNodeResponse.getClusterNode(),
284                                    e);
285                    }
286                    finally {
287                            if (socket != null) {
288                                    socket.close();
289                            }
290                    }
291            }
292    
293            private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
294    
295            private static final Log _log = LogFactoryUtil.getLog(
296                    ClusterLinkBootstrapLoaderHelperUtil.class);
297    
298            private static final MethodKey _createServerSocketFromClusterMethodKey =
299                    new MethodKey(
300                            ClusterLinkBootstrapLoaderHelperUtil.class,
301                            "createServerSocketFromCluster", String.class, List.class);
302            private static final Map<String, List<String>> _deferredPortalCaches =
303                    new HashMap<String, List<String>>();
304            private static final ServerSocketConfigurator _serverSocketConfigurator =
305                    new SocketCacheServerSocketConfiguration();
306            private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
307                    new InitialThreadLocal<Boolean>(
308                            ClusterLinkBootstrapLoaderHelperUtil.class +
309                                    "._skipBootstrapLoaderThreadLocal",
310                            false);
311            private static boolean _started;
312    
313            private static class CacheElement implements Serializable {
314    
315                    public CacheElement(Serializable key, Serializable value) {
316                            _key = key;
317                            _value = value;
318                    }
319    
320                    public Serializable getKey() {
321                            return _key;
322                    }
323    
324                    public Serializable getValue() {
325                            return _value;
326                    }
327    
328                    private final Serializable _key;
329                    private final Serializable _value;
330    
331            }
332    
333            private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
334    
335                    public ClusterLinkBootstrapLoaderServerThread(
336                            ServerSocket serverSocket, String portalCacheManagerName,
337                            List<String> portalCacheNames) {
338    
339                            _serverSocket = serverSocket;
340                            _portalCacheManagerName = portalCacheManagerName;
341                            _portalCacheNames = portalCacheNames;
342    
343                            setDaemon(true);
344                            setName(
345                                    ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
346                                            portalCacheNames);
347                            setPriority(Thread.NORM_PRIORITY);
348                    }
349    
350                    @Override
351                    public void run() {
352                            Socket socket = null;
353    
354                            try {
355                                    try {
356                                            socket = _serverSocket.accept();
357                                    }
358                                    catch (SocketTimeoutException ste) {
359                                            if (_log.isDebugEnabled()) {
360                                                    _log.debug(
361                                                            "Terminating the socket thread " + getName() +
362                                                                    " that the client requested but never used");
363                                            }
364    
365                                            return;
366                                    }
367                                    finally {
368                                            _serverSocket.close();
369                                    }
370    
371                                    socket.shutdownInput();
372    
373                                    try (OutputStream outputStream = socket.getOutputStream();
374                                            ObjectOutputStream objectOutputStream =
375                                                    new AnnotatedObjectOutputStream(outputStream)) {
376    
377                                            PortalCacheManager<? extends Serializable, ?>
378                                                    portalCacheManager =
379                                                            PortalCacheProvider.getPortalCacheManager(
380                                                                    _portalCacheManagerName);
381    
382                                            for (String portalCacheName : _portalCacheNames) {
383                                                    PortalCache<Serializable, Serializable> portalCache =
384                                                            (PortalCache<Serializable, Serializable>)
385                                                            portalCacheManager.getCache(portalCacheName);
386    
387                                                    if (portalCache == null) {
388                                                            _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
389    
390                                                            try {
391                                                                    portalCacheManager.getCache(portalCacheName);
392                                                            }
393                                                            finally {
394                                                                    _skipBootstrapLoaderThreadLocal.remove();
395                                                            }
396    
397                                                            continue;
398                                                    }
399    
400                                                    objectOutputStream.writeObject(portalCacheName);
401    
402                                                    List<Serializable> keys = portalCache.getKeys();
403    
404                                                    for (Serializable key : keys) {
405                                                            Serializable value = portalCache.get(key);
406    
407                                                            CacheElement cacheElement = new CacheElement(
408                                                                    key, value);
409    
410                                                            objectOutputStream.writeObject(cacheElement);
411                                                    }
412                                            }
413    
414                                            objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
415                                    }
416                            }
417                            catch (Exception e) {
418                                    throw new RuntimeException(e);
419                            }
420                            finally {
421                                    if (socket != null) {
422                                            try {
423                                                    socket.close();
424                                            }
425                                            catch (IOException ioe) {
426                                                    throw new RuntimeException(ioe);
427                                            }
428                                    }
429                            }
430                    }
431    
432                    private final String _portalCacheManagerName;
433                    private final List<String> _portalCacheNames;
434                    private final ServerSocket _serverSocket;
435    
436            }
437    
438            private static class SocketCacheServerSocketConfiguration
439                    implements ServerSocketConfigurator {
440    
441                    @Override
442                    public void configure(ServerSocket serverSocket)
443                            throws SocketException {
444    
445                            serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
446                    }
447    
448            }
449    
450    }