001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.cache.bootstrap;
016    
017    import com.liferay.portal.kernel.cache.PortalCache;
018    import com.liferay.portal.kernel.cache.PortalCacheHelperUtil;
019    import com.liferay.portal.kernel.cache.PortalCacheManager;
020    import com.liferay.portal.kernel.cache.PortalCacheProvider;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.exception.SystemException;
027    import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
028    import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    import com.liferay.portal.kernel.util.GetterUtil;
032    import com.liferay.portal.kernel.util.InitialThreadLocal;
033    import com.liferay.portal.kernel.util.MethodHandler;
034    import com.liferay.portal.kernel.util.MethodKey;
035    import com.liferay.portal.kernel.util.PropsKeys;
036    import com.liferay.portal.kernel.util.PropsUtil;
037    import com.liferay.portal.kernel.util.SocketUtil;
038    import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
039    
040    import java.io.IOException;
041    import java.io.ObjectInputStream;
042    import java.io.ObjectOutputStream;
043    import java.io.OutputStream;
044    import java.io.Serializable;
045    
046    import java.net.ServerSocket;
047    import java.net.Socket;
048    import java.net.SocketAddress;
049    import java.net.SocketException;
050    import java.net.SocketTimeoutException;
051    
052    import java.nio.channels.ServerSocketChannel;
053    
054    import java.util.ArrayList;
055    import java.util.Arrays;
056    import java.util.HashMap;
057    import java.util.List;
058    import java.util.Map;
059    import java.util.concurrent.BlockingQueue;
060    import java.util.concurrent.TimeUnit;
061    
062    /**
063     * @author Shuyang Zhou
064     * @author Sherry Yang
065     */
066    public class ClusterLinkBootstrapLoaderHelperUtil {
067    
068            public static SocketAddress createServerSocketFromCluster(
069                            String portalCacheManagerName, List<String> portalCacheNames)
070                    throws Exception {
071    
072                    ClusterNode localClusterNode =
073                            ClusterExecutorUtil.getLocalClusterNode();
074    
075                    ServerSocketChannel serverSocketChannel =
076                            SocketUtil.createServerSocketChannel(
077                                    localClusterNode.getBindInetAddress(),
078                                    GetterUtil.getInteger(
079                                            PropsUtil.get(PropsKeys.EHCACHE_SOCKET_START_PORT), 32454),
080                                    _serverSocketConfigurator);
081    
082                    ServerSocket serverSocket = serverSocketChannel.socket();
083    
084                    ClusterLinkBootstrapLoaderServerThread
085                            clusterLinkBootstrapLoaderServerThread =
086                                    new ClusterLinkBootstrapLoaderServerThread(
087                                            serverSocket, portalCacheManagerName, portalCacheNames);
088    
089                    clusterLinkBootstrapLoaderServerThread.start();
090    
091                    return serverSocket.getLocalSocketAddress();
092            }
093    
094            public static boolean isSkipped() {
095                    return _skipBootstrapLoaderThreadLocal.get();
096            }
097    
098            public static void loadCachesFromCluster(
099                            String portalCacheManagerName, String ... portalCacheNames)
100                    throws Exception {
101    
102                    synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
103                            if (!_started) {
104                                    List<String> portalCaches = _deferredPortalCaches.get(
105                                            portalCacheManagerName);
106    
107                                    if (portalCaches == null) {
108                                            portalCaches = new ArrayList<>();
109    
110                                            _deferredPortalCaches.put(
111                                                    portalCacheManagerName, portalCaches);
112                                    }
113    
114                                    portalCaches.addAll(Arrays.asList(portalCacheNames));
115    
116                                    return;
117                            }
118                    }
119    
120                    List<ClusterNode> clusterNodes = ClusterExecutorUtil.getClusterNodes();
121    
122                    if (_log.isInfoEnabled()) {
123                            _log.info("Cluster nodes " + clusterNodes);
124                    }
125    
126                    if (clusterNodes.size() <= 1) {
127                            if (_log.isDebugEnabled()) {
128                                    _log.debug(
129                                            "Not loading cache from cluster because a cluster peer " +
130                                                    "was not found");
131                            }
132    
133                            return;
134                    }
135    
136                    PortalCacheManager<? extends Serializable, ?> portalCacheManager =
137                            PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
138    
139                    if (!portalCacheManager.isClusterAware()) {
140                            return;
141                    }
142    
143                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
144                            new MethodHandler(
145                                    _createServerSocketFromClusterMethodKey, portalCacheManagerName,
146                                    Arrays.asList(portalCacheNames)),
147                            true);
148    
149                    FutureClusterResponses futureClusterResponses =
150                            ClusterExecutorUtil.execute(clusterRequest);
151    
152                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
153                            futureClusterResponses.getPartialResults();
154    
155                    ClusterNodeResponse clusterNodeResponse = null;
156    
157                    try {
158                            clusterNodeResponse = clusterNodeResponses.poll(
159                                    GetterUtil.getLong(
160                                            PropsUtil.get(
161                                                    PropsKeys.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
162                                            10000),
163                                    TimeUnit.MILLISECONDS);
164                    }
165                    catch (InterruptedException ie) {
166                            return;
167                    }
168    
169                    if (clusterNodeResponse == null) {
170                            if (_log.isWarnEnabled()) {
171                                    _log.warn(
172                                            "Unable to load cache from the cluster because there " +
173                                                    "was no peer response");
174                            }
175    
176                            return;
177                    }
178    
179                    if (_log.isInfoEnabled()) {
180                            _log.info(
181                                    "Load cache data from cluster node " +
182                                            clusterNodeResponse.getClusterNode());
183                    }
184    
185                    Socket socket = null;
186    
187                    try {
188                            SocketAddress remoteSocketAddress =
189                                    (SocketAddress)clusterNodeResponse.getResult();
190    
191                            if (remoteSocketAddress == null) {
192                                    _log.error(
193                                            "Cluster peer " + clusterNodeResponse.getClusterNode() +
194                                                    " responded with a null socket address");
195    
196                                    return;
197                            }
198    
199                            socket = new Socket();
200    
201                            socket.connect(remoteSocketAddress);
202    
203                            socket.shutdownOutput();
204    
205                            ObjectInputStream objectInputStream =
206                                    new AnnotatedObjectInputStream(socket.getInputStream());
207    
208                            PortalCache<Serializable, Serializable> portalCache = null;
209    
210                            try {
211                                    while (true) {
212                                            Object object = objectInputStream.readObject();
213    
214                                            if (object instanceof CacheElement) {
215                                                    CacheElement cacheElement = (CacheElement)object;
216    
217                                                    PortalCacheHelperUtil.putWithoutReplicator(
218                                                            portalCache, cacheElement.getKey(),
219                                                            cacheElement.getValue());
220                                            }
221                                            else if (object instanceof String) {
222                                                    if (_COMMAND_SOCKET_CLOSE.equals(object)) {
223                                                            break;
224                                                    }
225    
226                                                    _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
227    
228                                                    try {
229                                                            portalCache =
230                                                                    (PortalCache<Serializable, Serializable>)
231                                                                            portalCacheManager.getCache((String)object);
232                                                    }
233                                                    finally {
234                                                            _skipBootstrapLoaderThreadLocal.remove();
235                                                    }
236                                            }
237                                            else {
238                                                    throw new SystemException(
239                                                            "Socket input stream returned invalid object " +
240                                                                    object);
241                                            }
242                                    }
243                            }
244                            finally {
245                                    if (objectInputStream != null) {
246                                            objectInputStream.close();
247                                    }
248                            }
249                    }
250                    catch (Exception e) {
251                            throw new Exception(
252                                    "Unable to load cache data from cluster node " +
253                                            clusterNodeResponse.getClusterNode(),
254                                    e);
255                    }
256                    finally {
257                            if (socket != null) {
258                                    socket.close();
259                            }
260                    }
261            }
262    
263            public static synchronized void start() {
264                    if (!_started) {
265                            _started = true;
266                    }
267    
268                    if (_deferredPortalCaches.isEmpty()) {
269                            return;
270                    }
271    
272                    if (_log.isDebugEnabled()) {
273                            _log.debug("Loading deferred caches");
274                    }
275    
276                    try {
277                            for (Map.Entry<String, List<String>> entry :
278                                            _deferredPortalCaches.entrySet()) {
279    
280                                    List<String> portalCacheNames = entry.getValue();
281    
282                                    if (portalCacheNames.isEmpty()) {
283                                            continue;
284                                    }
285    
286                                    loadCachesFromCluster(
287                                            entry.getKey(),
288                                            portalCacheNames.toArray(
289                                                    new String[portalCacheNames.size()]));
290                            }
291                    }
292                    catch (Exception e) {
293                            if (_log.isWarnEnabled()) {
294                                    _log.warn("Unable to load cache data from the cluster", e);
295                            }
296                    }
297                    finally {
298                            _deferredPortalCaches.clear();
299                    }
300            }
301    
302            private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
303    
304            private static final Log _log = LogFactoryUtil.getLog(
305                    ClusterLinkBootstrapLoaderHelperUtil.class);
306    
307            private static final MethodKey _createServerSocketFromClusterMethodKey =
308                    new MethodKey(
309                            ClusterLinkBootstrapLoaderHelperUtil.class,
310                            "createServerSocketFromCluster", String.class, List.class);
311            private static final Map<String, List<String>> _deferredPortalCaches =
312                    new HashMap<>();
313            private static final ServerSocketConfigurator _serverSocketConfigurator =
314                    new SocketCacheServerSocketConfiguration();
315            private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
316                    new InitialThreadLocal<Boolean>(
317                            ClusterLinkBootstrapLoaderHelperUtil.class +
318                                    "._skipBootstrapLoaderThreadLocal",
319                            false);
320            private static boolean _started;
321    
322            private static class CacheElement implements Serializable {
323    
324                    public CacheElement(Serializable key, Serializable value) {
325                            _key = key;
326                            _value = value;
327                    }
328    
329                    public Serializable getKey() {
330                            return _key;
331                    }
332    
333                    public Serializable getValue() {
334                            return _value;
335                    }
336    
337                    private final Serializable _key;
338                    private final Serializable _value;
339    
340            }
341    
342            private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
343    
344                    public ClusterLinkBootstrapLoaderServerThread(
345                            ServerSocket serverSocket, String portalCacheManagerName,
346                            List<String> portalCacheNames) {
347    
348                            _serverSocket = serverSocket;
349                            _portalCacheManagerName = portalCacheManagerName;
350                            _portalCacheNames = portalCacheNames;
351    
352                            setDaemon(true);
353                            setName(
354                                    ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
355                                            portalCacheNames);
356                            setPriority(Thread.NORM_PRIORITY);
357                    }
358    
359                    @Override
360                    public void run() {
361                            Socket socket = null;
362    
363                            try {
364                                    try {
365                                            socket = _serverSocket.accept();
366                                    }
367                                    catch (SocketTimeoutException ste) {
368                                            if (_log.isDebugEnabled()) {
369                                                    _log.debug(
370                                                            "Terminating the socket thread " + getName() +
371                                                                    " that the client requested but never used");
372                                            }
373    
374                                            return;
375                                    }
376                                    finally {
377                                            _serverSocket.close();
378                                    }
379    
380                                    socket.shutdownInput();
381    
382                                    try (OutputStream outputStream = socket.getOutputStream();
383                                            ObjectOutputStream objectOutputStream =
384                                                    new AnnotatedObjectOutputStream(outputStream)) {
385    
386                                            PortalCacheManager<? extends Serializable, ?>
387                                                    portalCacheManager =
388                                                            PortalCacheProvider.getPortalCacheManager(
389                                                                    _portalCacheManagerName);
390    
391                                            for (String portalCacheName : _portalCacheNames) {
392                                                    PortalCache<Serializable, Serializable> portalCache =
393                                                            (PortalCache<Serializable, Serializable>)
394                                                            portalCacheManager.getCache(portalCacheName);
395    
396                                                    if (portalCache == null) {
397                                                            _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
398    
399                                                            try {
400                                                                    portalCacheManager.getCache(portalCacheName);
401                                                            }
402                                                            finally {
403                                                                    _skipBootstrapLoaderThreadLocal.remove();
404                                                            }
405    
406                                                            continue;
407                                                    }
408    
409                                                    objectOutputStream.writeObject(portalCacheName);
410    
411                                                    List<Serializable> keys = portalCache.getKeys();
412    
413                                                    for (Serializable key : keys) {
414                                                            Serializable value = portalCache.get(key);
415    
416                                                            CacheElement cacheElement = new CacheElement(
417                                                                    key, value);
418    
419                                                            objectOutputStream.writeObject(cacheElement);
420                                                    }
421                                            }
422    
423                                            objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
424                                    }
425                            }
426                            catch (Exception e) {
427                                    throw new RuntimeException(e);
428                            }
429                            finally {
430                                    if (socket != null) {
431                                            try {
432                                                    socket.close();
433                                            }
434                                            catch (IOException ioe) {
435                                                    throw new RuntimeException(ioe);
436                                            }
437                                    }
438                            }
439                    }
440    
441                    private final String _portalCacheManagerName;
442                    private final List<String> _portalCacheNames;
443                    private final ServerSocket _serverSocket;
444    
445            }
446    
447            private static class SocketCacheServerSocketConfiguration
448                    implements ServerSocketConfigurator {
449    
450                    @Override
451                    public void configure(ServerSocket serverSocket)
452                            throws SocketException {
453    
454                            serverSocket.setSoTimeout(
455                                    GetterUtil.getInteger(
456                                            PropsUtil.get(PropsKeys.EHCACHE_SOCKET_SO_TIMEOUT), 10000));
457                    }
458    
459            }
460    
461    }