001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.bootstrap;
016    
017    import com.liferay.portal.kernel.cache.PortalCache;
018    import com.liferay.portal.kernel.cache.PortalCacheHelperUtil;
019    import com.liferay.portal.kernel.cache.PortalCacheManager;
020    import com.liferay.portal.kernel.cache.PortalCacheProvider;
021    import com.liferay.portal.kernel.cluster.Address;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
029    import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
030    import com.liferay.portal.kernel.log.Log;
031    import com.liferay.portal.kernel.log.LogFactoryUtil;
032    import com.liferay.portal.kernel.util.InitialThreadLocal;
033    import com.liferay.portal.kernel.util.MethodHandler;
034    import com.liferay.portal.kernel.util.MethodKey;
035    import com.liferay.portal.kernel.util.SocketUtil;
036    import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
037    import com.liferay.portal.util.PropsValues;
038    
039    import java.io.IOException;
040    import java.io.ObjectInputStream;
041    import java.io.ObjectOutputStream;
042    import java.io.OutputStream;
043    import java.io.Serializable;
044    
045    import java.net.ServerSocket;
046    import java.net.Socket;
047    import java.net.SocketAddress;
048    import java.net.SocketException;
049    import java.net.SocketTimeoutException;
050    
051    import java.nio.channels.ServerSocketChannel;
052    
053    import java.util.ArrayList;
054    import java.util.Arrays;
055    import java.util.HashMap;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.concurrent.BlockingQueue;
059    import java.util.concurrent.TimeUnit;
060    
061    /**
062     * @author Shuyang Zhou
063     * @author Sherry Yang
064     */
065    public class ClusterLinkBootstrapLoaderHelperUtil {
066    
067            public static SocketAddress createServerSocketFromCluster(
068                            String portalCacheManagerName, List<String> portalCacheNames)
069                    throws Exception {
070    
071                    ServerSocketChannel serverSocketChannel =
072                            SocketUtil.createServerSocketChannel(
073                                    ClusterLinkUtil.getBindInetAddress(),
074                                    PropsValues.EHCACHE_SOCKET_START_PORT,
075                                    _serverSocketConfigurator);
076    
077                    ServerSocket serverSocket = serverSocketChannel.socket();
078    
079                    ClusterLinkBootstrapLoaderServerThread
080                            clusterLinkBootstrapLoaderServerThread =
081                                    new ClusterLinkBootstrapLoaderServerThread(
082                                            serverSocket, portalCacheManagerName, portalCacheNames);
083    
084                    clusterLinkBootstrapLoaderServerThread.start();
085    
086                    return serverSocket.getLocalSocketAddress();
087            }
088    
089            public static synchronized void start() {
090                    if (!_started) {
091                            _started = true;
092                    }
093    
094                    if (_deferredPortalCaches.isEmpty()) {
095                            return;
096                    }
097    
098                    if (_log.isDebugEnabled()) {
099                            _log.debug("Loading deferred caches");
100                    }
101    
102                    try {
103                            for (Map.Entry<String, List<String>> entry :
104                                            _deferredPortalCaches.entrySet()) {
105    
106                                    List<String> portalCacheNames = entry.getValue();
107    
108                                    if (portalCacheNames.isEmpty()) {
109                                            continue;
110                                    }
111    
112                                    loadCachesFromCluster(
113                                            entry.getKey(),
114                                            portalCacheNames.toArray(
115                                                    new String[portalCacheNames.size()]));
116                            }
117                    }
118                    catch (Exception e) {
119                            if (_log.isWarnEnabled()) {
120                                    _log.warn("Unable to load cache data from the cluster", e);
121                            }
122                    }
123                    finally {
124                            _deferredPortalCaches.clear();
125                    }
126            }
127    
128            protected static boolean isSkipped() {
129                    return _skipBootstrapLoaderThreadLocal.get();
130            }
131    
132            protected static void loadCachesFromCluster(
133                            String portalCacheManagerName, String ... portalCacheNames)
134                    throws Exception {
135    
136                    synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
137                            if (!_started) {
138                                    List<String> portalCaches = _deferredPortalCaches.get(
139                                            portalCacheManagerName);
140    
141                                    if (portalCaches == null) {
142                                            portalCaches = new ArrayList<String>();
143    
144                                            _deferredPortalCaches.put(
145                                                    portalCacheManagerName, portalCaches);
146                                    }
147    
148                                    portalCaches.addAll(Arrays.asList(portalCacheNames));
149    
150                                    return;
151                            }
152                    }
153    
154                    List<Address> clusterNodeAddresses =
155                            ClusterExecutorUtil.getClusterNodeAddresses();
156    
157                    if (_log.isInfoEnabled()) {
158                            _log.info("Cluster node addresses " + clusterNodeAddresses);
159                    }
160    
161                    if (clusterNodeAddresses.size() <= 1) {
162                            if (_log.isDebugEnabled()) {
163                                    _log.debug(
164                                            "Not loading cache from cluster because a cluster peer " +
165                                                    "was not found");
166                            }
167    
168                            return;
169                    }
170    
171                    PortalCacheManager<? extends Serializable, ?> portalCacheManager =
172                            PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
173    
174                    if (!portalCacheManager.isClusterAware()) {
175                            return;
176                    }
177    
178                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
179                            new MethodHandler(
180                                    _createServerSocketFromClusterMethodKey, portalCacheManagerName,
181                                    Arrays.asList(portalCacheNames)),
182                            true);
183    
184                    FutureClusterResponses futureClusterResponses =
185                            ClusterExecutorUtil.execute(clusterRequest);
186    
187                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
188                            futureClusterResponses.getPartialResults();
189    
190                    ClusterNodeResponse clusterNodeResponse = null;
191    
192                    try {
193                            clusterNodeResponse = clusterNodeResponses.poll(
194                                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
195                                    TimeUnit.MILLISECONDS);
196                    }
197                    catch (InterruptedException ie) {
198                            return;
199                    }
200    
201                    if (clusterNodeResponse == null) {
202                            if (_log.isWarnEnabled()) {
203                                    _log.warn(
204                                            "Unable to load cache from the cluster because there " +
205                                                    "was no peer response");
206                            }
207    
208                            return;
209                    }
210    
211                    if (_log.isInfoEnabled()) {
212                            _log.info(
213                                    "Load cache data from cluster node " +
214                                            clusterNodeResponse.getClusterNode());
215                    }
216    
217                    Socket socket = null;
218    
219                    try {
220                            SocketAddress remoteSocketAddress =
221                                    (SocketAddress)clusterNodeResponse.getResult();
222    
223                            if (remoteSocketAddress == null) {
224                                    _log.error(
225                                            "Cluster peer " + clusterNodeResponse.getClusterNode() +
226                                                    " responded with a null socket address");
227    
228                                    return;
229                            }
230    
231                            socket = new Socket();
232    
233                            socket.connect(remoteSocketAddress);
234    
235                            socket.shutdownOutput();
236    
237                            ObjectInputStream objectInputStream =
238                                    new AnnotatedObjectInputStream(socket.getInputStream());
239    
240                            PortalCache<Serializable, Serializable> portalCache = null;
241    
242                            try {
243                                    while (true) {
244                                            Object object = objectInputStream.readObject();
245    
246                                            if (object instanceof CacheElement) {
247                                                    CacheElement cacheElement = (CacheElement)object;
248    
249                                                    PortalCacheHelperUtil.putWithoutReplicator(
250                                                            portalCache, cacheElement.getKey(),
251                                                            cacheElement.getValue());
252                                            }
253                                            else if (object instanceof String) {
254                                                    if (_COMMAND_SOCKET_CLOSE.equals(object)) {
255                                                            break;
256                                                    }
257    
258                                                    _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
259    
260                                                    try {
261                                                            portalCache =
262                                                                    (PortalCache<Serializable, Serializable>)
263                                                                            portalCacheManager.getCache((String)object);
264                                                    }
265                                                    finally {
266                                                            _skipBootstrapLoaderThreadLocal.remove();
267                                                    }
268                                            }
269                                            else {
270                                                    throw new SystemException(
271                                                            "Socket input stream returned invalid object " +
272                                                                    object);
273                                            }
274                                    }
275                            }
276                            finally {
277                                    if (objectInputStream != null) {
278                                            objectInputStream.close();
279                                    }
280                            }
281                    }
282                    catch (Exception e) {
283                            throw new Exception(
284                                    "Unable to load cache data from cluster node " +
285                                            clusterNodeResponse.getClusterNode(),
286                                    e);
287                    }
288                    finally {
289                            if (socket != null) {
290                                    socket.close();
291                            }
292                    }
293            }
294    
295            private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
296    
297            private static final Log _log = LogFactoryUtil.getLog(
298                    ClusterLinkBootstrapLoaderHelperUtil.class);
299    
300            private static final MethodKey _createServerSocketFromClusterMethodKey =
301                    new MethodKey(
302                            ClusterLinkBootstrapLoaderHelperUtil.class,
303                            "createServerSocketFromCluster", String.class, List.class);
304            private static final Map<String, List<String>> _deferredPortalCaches =
305                    new HashMap<String, List<String>>();
306            private static final ServerSocketConfigurator _serverSocketConfigurator =
307                    new SocketCacheServerSocketConfiguration();
308            private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
309                    new InitialThreadLocal<Boolean>(
310                            ClusterLinkBootstrapLoaderHelperUtil.class +
311                                    "._skipBootstrapLoaderThreadLocal",
312                            false);
313            private static boolean _started;
314    
315            private static class CacheElement implements Serializable {
316    
317                    public CacheElement(Serializable key, Serializable value) {
318                            _key = key;
319                            _value = value;
320                    }
321    
322                    public Serializable getKey() {
323                            return _key;
324                    }
325    
326                    public Serializable getValue() {
327                            return _value;
328                    }
329    
330                    private final Serializable _key;
331                    private final Serializable _value;
332    
333            }
334    
335            private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
336    
337                    public ClusterLinkBootstrapLoaderServerThread(
338                            ServerSocket serverSocket, String portalCacheManagerName,
339                            List<String> portalCacheNames) {
340    
341                            _serverSocket = serverSocket;
342                            _portalCacheManagerName = portalCacheManagerName;
343                            _portalCacheNames = portalCacheNames;
344    
345                            setDaemon(true);
346                            setName(
347                                    ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
348                                            portalCacheNames);
349                            setPriority(Thread.NORM_PRIORITY);
350                    }
351    
352                    @Override
353                    public void run() {
354                            Socket socket = null;
355    
356                            try {
357                                    try {
358                                            socket = _serverSocket.accept();
359                                    }
360                                    catch (SocketTimeoutException ste) {
361                                            if (_log.isDebugEnabled()) {
362                                                    _log.debug(
363                                                            "Terminating the socket thread " + getName() +
364                                                                    " that the client requested but never used");
365                                            }
366    
367                                            return;
368                                    }
369                                    finally {
370                                            _serverSocket.close();
371                                    }
372    
373                                    socket.shutdownInput();
374    
375                                    try (OutputStream outputStream = socket.getOutputStream();
376                                            ObjectOutputStream objectOutputStream =
377                                                    new AnnotatedObjectOutputStream(outputStream)) {
378    
379                                            PortalCacheManager<? extends Serializable, ?>
380                                                    portalCacheManager =
381                                                            PortalCacheProvider.getPortalCacheManager(
382                                                                    _portalCacheManagerName);
383    
384                                            for (String portalCacheName : _portalCacheNames) {
385                                                    PortalCache<Serializable, Serializable> portalCache =
386                                                            (PortalCache<Serializable, Serializable>)
387                                                            portalCacheManager.getCache(portalCacheName);
388    
389                                                    if (portalCache == null) {
390                                                            _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
391    
392                                                            try {
393                                                                    portalCacheManager.getCache(portalCacheName);
394                                                            }
395                                                            finally {
396                                                                    _skipBootstrapLoaderThreadLocal.remove();
397                                                            }
398    
399                                                            continue;
400                                                    }
401    
402                                                    objectOutputStream.writeObject(portalCacheName);
403    
404                                                    List<Serializable> keys = portalCache.getKeys();
405    
406                                                    for (Serializable key : keys) {
407                                                            Serializable value = portalCache.get(key);
408    
409                                                            CacheElement cacheElement = new CacheElement(
410                                                                    key, value);
411    
412                                                            objectOutputStream.writeObject(cacheElement);
413                                                    }
414                                            }
415    
416                                            objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
417                                    }
418                            }
419                            catch (Exception e) {
420                                    throw new RuntimeException(e);
421                            }
422                            finally {
423                                    if (socket != null) {
424                                            try {
425                                                    socket.close();
426                                            }
427                                            catch (IOException ioe) {
428                                                    throw new RuntimeException(ioe);
429                                            }
430                                    }
431                            }
432                    }
433    
434                    private final String _portalCacheManagerName;
435                    private final List<String> _portalCacheNames;
436                    private final ServerSocket _serverSocket;
437    
438            }
439    
440            private static class SocketCacheServerSocketConfiguration
441                    implements ServerSocketConfigurator {
442    
443                    @Override
444                    public void configure(ServerSocket serverSocket)
445                            throws SocketException {
446    
447                            serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
448                    }
449    
450            }
451    
452    }