001
014
015 package com.liferay.portal.kernel.cache.bootstrap;
016
017 import com.liferay.portal.kernel.cache.PortalCache;
018 import com.liferay.portal.kernel.cache.PortalCacheHelperUtil;
019 import com.liferay.portal.kernel.cache.PortalCacheManager;
020 import com.liferay.portal.kernel.cache.PortalCacheProvider;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.exception.SystemException;
027 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
028 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031 import com.liferay.portal.kernel.util.GetterUtil;
032 import com.liferay.portal.kernel.util.InitialThreadLocal;
033 import com.liferay.portal.kernel.util.MethodHandler;
034 import com.liferay.portal.kernel.util.MethodKey;
035 import com.liferay.portal.kernel.util.PropsKeys;
036 import com.liferay.portal.kernel.util.PropsUtil;
037 import com.liferay.portal.kernel.util.SocketUtil;
038 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
039
040 import java.io.IOException;
041 import java.io.ObjectInputStream;
042 import java.io.ObjectOutputStream;
043 import java.io.OutputStream;
044 import java.io.Serializable;
045
046 import java.net.ServerSocket;
047 import java.net.Socket;
048 import java.net.SocketAddress;
049 import java.net.SocketException;
050 import java.net.SocketTimeoutException;
051
052 import java.nio.channels.ServerSocketChannel;
053
054 import java.util.ArrayList;
055 import java.util.Arrays;
056 import java.util.HashMap;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.concurrent.BlockingQueue;
060 import java.util.concurrent.TimeUnit;
061
062
066 public class ClusterLinkBootstrapLoaderHelperUtil {
067
068 public static SocketAddress createServerSocketFromCluster(
069 String portalCacheManagerName, List<String> portalCacheNames)
070 throws Exception {
071
072 ClusterNode localClusterNode =
073 ClusterExecutorUtil.getLocalClusterNode();
074
075 ServerSocketChannel serverSocketChannel =
076 SocketUtil.createServerSocketChannel(
077 localClusterNode.getBindInetAddress(),
078 GetterUtil.getInteger(
079 PropsUtil.get(PropsKeys.EHCACHE_SOCKET_START_PORT), 32454),
080 _serverSocketConfigurator);
081
082 ServerSocket serverSocket = serverSocketChannel.socket();
083
084 ClusterLinkBootstrapLoaderServerThread
085 clusterLinkBootstrapLoaderServerThread =
086 new ClusterLinkBootstrapLoaderServerThread(
087 serverSocket, portalCacheManagerName, portalCacheNames);
088
089 clusterLinkBootstrapLoaderServerThread.start();
090
091 return serverSocket.getLocalSocketAddress();
092 }
093
094 public static boolean isSkipped() {
095 return _skipBootstrapLoaderThreadLocal.get();
096 }
097
098 public static void loadCachesFromCluster(
099 String portalCacheManagerName, String ... portalCacheNames)
100 throws Exception {
101
102 synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
103 if (!_started) {
104 List<String> portalCaches = _deferredPortalCaches.get(
105 portalCacheManagerName);
106
107 if (portalCaches == null) {
108 portalCaches = new ArrayList<>();
109
110 _deferredPortalCaches.put(
111 portalCacheManagerName, portalCaches);
112 }
113
114 portalCaches.addAll(Arrays.asList(portalCacheNames));
115
116 return;
117 }
118 }
119
120 List<ClusterNode> clusterNodes = ClusterExecutorUtil.getClusterNodes();
121
122 if (_log.isInfoEnabled()) {
123 _log.info("Cluster nodes " + clusterNodes);
124 }
125
126 if (clusterNodes.size() <= 1) {
127 if (_log.isDebugEnabled()) {
128 _log.debug(
129 "Not loading cache from cluster because a cluster peer " +
130 "was not found");
131 }
132
133 return;
134 }
135
136 PortalCacheManager<? extends Serializable, ?> portalCacheManager =
137 PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
138
139 if (!portalCacheManager.isClusterAware()) {
140 return;
141 }
142
143 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
144 new MethodHandler(
145 _createServerSocketFromClusterMethodKey, portalCacheManagerName,
146 Arrays.asList(portalCacheNames)),
147 true);
148
149 FutureClusterResponses futureClusterResponses =
150 ClusterExecutorUtil.execute(clusterRequest);
151
152 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
153 futureClusterResponses.getPartialResults();
154
155 ClusterNodeResponse clusterNodeResponse = null;
156
157 try {
158 clusterNodeResponse = clusterNodeResponses.poll(
159 GetterUtil.getLong(
160 PropsUtil.get(
161 PropsKeys.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
162 10000),
163 TimeUnit.MILLISECONDS);
164 }
165 catch (InterruptedException ie) {
166 return;
167 }
168
169 if (clusterNodeResponse == null) {
170 if (_log.isWarnEnabled()) {
171 _log.warn(
172 "Unable to load cache from the cluster because there " +
173 "was no peer response");
174 }
175
176 return;
177 }
178
179 if (_log.isInfoEnabled()) {
180 _log.info(
181 "Load cache data from cluster node " +
182 clusterNodeResponse.getClusterNode());
183 }
184
185 Socket socket = null;
186
187 try {
188 SocketAddress remoteSocketAddress =
189 (SocketAddress)clusterNodeResponse.getResult();
190
191 if (remoteSocketAddress == null) {
192 _log.error(
193 "Cluster peer " + clusterNodeResponse.getClusterNode() +
194 " responded with a null socket address");
195
196 return;
197 }
198
199 socket = new Socket();
200
201 socket.connect(remoteSocketAddress);
202
203 socket.shutdownOutput();
204
205 ObjectInputStream objectInputStream =
206 new AnnotatedObjectInputStream(socket.getInputStream());
207
208 PortalCache<Serializable, Serializable> portalCache = null;
209
210 try {
211 while (true) {
212 Object object = objectInputStream.readObject();
213
214 if (object instanceof CacheElement) {
215 CacheElement cacheElement = (CacheElement)object;
216
217 PortalCacheHelperUtil.putWithoutReplicator(
218 portalCache, cacheElement.getKey(),
219 cacheElement.getValue());
220 }
221 else if (object instanceof String) {
222 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
223 break;
224 }
225
226 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
227
228 try {
229 portalCache =
230 (PortalCache<Serializable, Serializable>)
231 portalCacheManager.getCache((String)object);
232 }
233 finally {
234 _skipBootstrapLoaderThreadLocal.remove();
235 }
236 }
237 else {
238 throw new SystemException(
239 "Socket input stream returned invalid object " +
240 object);
241 }
242 }
243 }
244 finally {
245 if (objectInputStream != null) {
246 objectInputStream.close();
247 }
248 }
249 }
250 catch (Exception e) {
251 throw new Exception(
252 "Unable to load cache data from cluster node " +
253 clusterNodeResponse.getClusterNode(),
254 e);
255 }
256 finally {
257 if (socket != null) {
258 socket.close();
259 }
260 }
261 }
262
263 public static synchronized void start() {
264 if (!_started) {
265 _started = true;
266 }
267
268 if (_deferredPortalCaches.isEmpty()) {
269 return;
270 }
271
272 if (_log.isDebugEnabled()) {
273 _log.debug("Loading deferred caches");
274 }
275
276 try {
277 for (Map.Entry<String, List<String>> entry :
278 _deferredPortalCaches.entrySet()) {
279
280 List<String> portalCacheNames = entry.getValue();
281
282 if (portalCacheNames.isEmpty()) {
283 continue;
284 }
285
286 loadCachesFromCluster(
287 entry.getKey(),
288 portalCacheNames.toArray(
289 new String[portalCacheNames.size()]));
290 }
291 }
292 catch (Exception e) {
293 if (_log.isWarnEnabled()) {
294 _log.warn("Unable to load cache data from the cluster", e);
295 }
296 }
297 finally {
298 _deferredPortalCaches.clear();
299 }
300 }
301
302 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
303
304 private static final Log _log = LogFactoryUtil.getLog(
305 ClusterLinkBootstrapLoaderHelperUtil.class);
306
307 private static final MethodKey _createServerSocketFromClusterMethodKey =
308 new MethodKey(
309 ClusterLinkBootstrapLoaderHelperUtil.class,
310 "createServerSocketFromCluster", String.class, List.class);
311 private static final Map<String, List<String>> _deferredPortalCaches =
312 new HashMap<>();
313 private static final ServerSocketConfigurator _serverSocketConfigurator =
314 new SocketCacheServerSocketConfiguration();
315 private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
316 new InitialThreadLocal<Boolean>(
317 ClusterLinkBootstrapLoaderHelperUtil.class +
318 "._skipBootstrapLoaderThreadLocal",
319 false);
320 private static boolean _started;
321
322 private static class CacheElement implements Serializable {
323
324 public CacheElement(Serializable key, Serializable value) {
325 _key = key;
326 _value = value;
327 }
328
329 public Serializable getKey() {
330 return _key;
331 }
332
333 public Serializable getValue() {
334 return _value;
335 }
336
337 private final Serializable _key;
338 private final Serializable _value;
339
340 }
341
342 private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
343
344 public ClusterLinkBootstrapLoaderServerThread(
345 ServerSocket serverSocket, String portalCacheManagerName,
346 List<String> portalCacheNames) {
347
348 _serverSocket = serverSocket;
349 _portalCacheManagerName = portalCacheManagerName;
350 _portalCacheNames = portalCacheNames;
351
352 setDaemon(true);
353 setName(
354 ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
355 portalCacheNames);
356 setPriority(Thread.NORM_PRIORITY);
357 }
358
359 @Override
360 public void run() {
361 Socket socket = null;
362
363 try {
364 try {
365 socket = _serverSocket.accept();
366 }
367 catch (SocketTimeoutException ste) {
368 if (_log.isDebugEnabled()) {
369 _log.debug(
370 "Terminating the socket thread " + getName() +
371 " that the client requested but never used");
372 }
373
374 return;
375 }
376 finally {
377 _serverSocket.close();
378 }
379
380 socket.shutdownInput();
381
382 try (OutputStream outputStream = socket.getOutputStream();
383 ObjectOutputStream objectOutputStream =
384 new AnnotatedObjectOutputStream(outputStream)) {
385
386 PortalCacheManager<? extends Serializable, ?>
387 portalCacheManager =
388 PortalCacheProvider.getPortalCacheManager(
389 _portalCacheManagerName);
390
391 for (String portalCacheName : _portalCacheNames) {
392 PortalCache<Serializable, Serializable> portalCache =
393 (PortalCache<Serializable, Serializable>)
394 portalCacheManager.getCache(portalCacheName);
395
396 if (portalCache == null) {
397 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
398
399 try {
400 portalCacheManager.getCache(portalCacheName);
401 }
402 finally {
403 _skipBootstrapLoaderThreadLocal.remove();
404 }
405
406 continue;
407 }
408
409 objectOutputStream.writeObject(portalCacheName);
410
411 List<Serializable> keys = portalCache.getKeys();
412
413 for (Serializable key : keys) {
414 Serializable value = portalCache.get(key);
415
416 CacheElement cacheElement = new CacheElement(
417 key, value);
418
419 objectOutputStream.writeObject(cacheElement);
420 }
421 }
422
423 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
424 }
425 }
426 catch (Exception e) {
427 throw new RuntimeException(e);
428 }
429 finally {
430 if (socket != null) {
431 try {
432 socket.close();
433 }
434 catch (IOException ioe) {
435 throw new RuntimeException(ioe);
436 }
437 }
438 }
439 }
440
441 private final String _portalCacheManagerName;
442 private final List<String> _portalCacheNames;
443 private final ServerSocket _serverSocket;
444
445 }
446
447 private static class SocketCacheServerSocketConfiguration
448 implements ServerSocketConfigurator {
449
450 @Override
451 public void configure(ServerSocket serverSocket)
452 throws SocketException {
453
454 serverSocket.setSoTimeout(
455 GetterUtil.getInteger(
456 PropsUtil.get(PropsKeys.EHCACHE_SOCKET_SO_TIMEOUT), 10000));
457 }
458
459 }
460
461 }