001
014
015 package com.liferay.portal.cache.bootstrap;
016
017 import com.liferay.portal.kernel.cache.PortalCache;
018 import com.liferay.portal.kernel.cache.PortalCacheManager;
019 import com.liferay.portal.kernel.cache.PortalCacheProvider;
020 import com.liferay.portal.kernel.cluster.Address;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.exception.SystemException;
027 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
028 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031 import com.liferay.portal.kernel.util.InitialThreadLocal;
032 import com.liferay.portal.kernel.util.MethodHandler;
033 import com.liferay.portal.kernel.util.MethodKey;
034 import com.liferay.portal.kernel.util.SocketUtil;
035 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
036 import com.liferay.portal.util.PropsValues;
037
038 import java.io.IOException;
039 import java.io.ObjectInputStream;
040 import java.io.ObjectOutputStream;
041 import java.io.OutputStream;
042 import java.io.Serializable;
043
044 import java.net.ServerSocket;
045 import java.net.Socket;
046 import java.net.SocketAddress;
047 import java.net.SocketException;
048 import java.net.SocketTimeoutException;
049
050 import java.nio.channels.ServerSocketChannel;
051
052 import java.util.ArrayList;
053 import java.util.Arrays;
054 import java.util.HashMap;
055 import java.util.List;
056 import java.util.Map;
057 import java.util.concurrent.BlockingQueue;
058 import java.util.concurrent.TimeUnit;
059
060
064 public class ClusterLinkBootstrapLoaderHelperUtil {
065
066 public static SocketAddress createServerSocketFromCluster(
067 String portalCacheManagerName, List<String> portalCacheNames)
068 throws Exception {
069
070 ServerSocketChannel serverSocketChannel =
071 SocketUtil.createServerSocketChannel(
072 ClusterLinkUtil.getBindInetAddress(),
073 PropsValues.EHCACHE_SOCKET_START_PORT,
074 _serverSocketConfigurator);
075
076 ServerSocket serverSocket = serverSocketChannel.socket();
077
078 ClusterLinkBootstrapLoaderServerThread
079 clusterLinkBootstrapLoaderServerThread =
080 new ClusterLinkBootstrapLoaderServerThread(
081 serverSocket, portalCacheManagerName, portalCacheNames);
082
083 clusterLinkBootstrapLoaderServerThread.start();
084
085 return serverSocket.getLocalSocketAddress();
086 }
087
088 public static synchronized void start() {
089 if (!_started) {
090 _started = true;
091 }
092
093 if (_deferredPortalCaches.isEmpty()) {
094 return;
095 }
096
097 if (_log.isDebugEnabled()) {
098 _log.debug("Loading deferred caches");
099 }
100
101 try {
102 for (Map.Entry<String, List<String>> entry :
103 _deferredPortalCaches.entrySet()) {
104
105 List<String> portalCacheNames = entry.getValue();
106
107 if (portalCacheNames.isEmpty()) {
108 continue;
109 }
110
111 loadCachesFromCluster(
112 entry.getKey(),
113 portalCacheNames.toArray(
114 new String[portalCacheNames.size()]));
115 }
116 }
117 catch (Exception e) {
118 if (_log.isWarnEnabled()) {
119 _log.warn("Unable to load cache data from the cluster", e);
120 }
121 }
122 finally {
123 _deferredPortalCaches.clear();
124 }
125 }
126
127 protected static boolean isSkipped() {
128 return _skipBootstrapLoaderThreadLocal.get();
129 }
130
131 protected static void loadCachesFromCluster(
132 String portalCacheManagerName, String ... portalCacheNames)
133 throws Exception {
134
135 synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
136 if (!_started) {
137 List<String> portalCaches = _deferredPortalCaches.get(
138 portalCacheManagerName);
139
140 if (portalCaches == null) {
141 portalCaches = new ArrayList<String>();
142
143 _deferredPortalCaches.put(
144 portalCacheManagerName, portalCaches);
145 }
146
147 portalCaches.addAll(Arrays.asList(portalCacheNames));
148
149 return;
150 }
151 }
152
153 List<Address> clusterNodeAddresses =
154 ClusterExecutorUtil.getClusterNodeAddresses();
155
156 if (_log.isInfoEnabled()) {
157 _log.info("Cluster node addresses " + clusterNodeAddresses);
158 }
159
160 if (clusterNodeAddresses.size() <= 1) {
161 if (_log.isDebugEnabled()) {
162 _log.debug(
163 "Not loading cache from cluster because a cluster peer " +
164 "was not found");
165 }
166
167 return;
168 }
169
170 PortalCacheManager<? extends Serializable, ?> portalCacheManager =
171 PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
172
173 if (!portalCacheManager.isClusterAware()) {
174 return;
175 }
176
177 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
178 new MethodHandler(
179 _createServerSocketFromClusterMethodKey, portalCacheManagerName,
180 Arrays.asList(portalCacheNames)),
181 true);
182
183 FutureClusterResponses futureClusterResponses =
184 ClusterExecutorUtil.execute(clusterRequest);
185
186 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
187 futureClusterResponses.getPartialResults();
188
189 ClusterNodeResponse clusterNodeResponse = null;
190
191 try {
192 clusterNodeResponse = clusterNodeResponses.poll(
193 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
194 TimeUnit.MILLISECONDS);
195 }
196 catch (InterruptedException ie) {
197 return;
198 }
199
200 if (clusterNodeResponse == null) {
201 if (_log.isWarnEnabled()) {
202 _log.warn(
203 "Unable to load cache from the cluster because there " +
204 "was no peer response");
205 }
206
207 return;
208 }
209
210 if (_log.isInfoEnabled()) {
211 _log.info(
212 "Load cache data from cluster node " +
213 clusterNodeResponse.getClusterNode());
214 }
215
216 Socket socket = null;
217
218 try {
219 SocketAddress remoteSocketAddress =
220 (SocketAddress)clusterNodeResponse.getResult();
221
222 if (remoteSocketAddress == null) {
223 _log.error(
224 "Cluster peer " + clusterNodeResponse.getClusterNode() +
225 " responded with a null socket address");
226
227 return;
228 }
229
230 socket = new Socket();
231
232 socket.connect(remoteSocketAddress);
233
234 socket.shutdownOutput();
235
236 ObjectInputStream objectInputStream =
237 new AnnotatedObjectInputStream(socket.getInputStream());
238
239 PortalCache<Serializable, Serializable> portalCache = null;
240
241 try {
242 while (true) {
243 Object object = objectInputStream.readObject();
244
245 if (object instanceof CacheElement) {
246 CacheElement cacheElement = (CacheElement)object;
247
248 portalCache.putQuiet(
249 cacheElement.getKey(), cacheElement.getValue());
250 }
251 else if (object instanceof String) {
252 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
253 break;
254 }
255
256 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
257
258 try {
259 portalCache =
260 (PortalCache<Serializable, Serializable>)
261 portalCacheManager.getCache((String)object);
262 }
263 finally {
264 _skipBootstrapLoaderThreadLocal.remove();
265 }
266 }
267 else {
268 throw new SystemException(
269 "Socket input stream returned invalid object " +
270 object);
271 }
272 }
273 }
274 finally {
275 if (objectInputStream != null) {
276 objectInputStream.close();
277 }
278 }
279 }
280 catch (Exception e) {
281 throw new Exception(
282 "Unable to load cache data from cluster node " +
283 clusterNodeResponse.getClusterNode(),
284 e);
285 }
286 finally {
287 if (socket != null) {
288 socket.close();
289 }
290 }
291 }
292
293 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
294
295 private static final Log _log = LogFactoryUtil.getLog(
296 ClusterLinkBootstrapLoaderHelperUtil.class);
297
298 private static final MethodKey _createServerSocketFromClusterMethodKey =
299 new MethodKey(
300 ClusterLinkBootstrapLoaderHelperUtil.class,
301 "createServerSocketFromCluster", String.class, List.class);
302 private static final Map<String, List<String>> _deferredPortalCaches =
303 new HashMap<String, List<String>>();
304 private static final ServerSocketConfigurator _serverSocketConfigurator =
305 new SocketCacheServerSocketConfiguration();
306 private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
307 new InitialThreadLocal<Boolean>(
308 ClusterLinkBootstrapLoaderHelperUtil.class +
309 "._skipBootstrapLoaderThreadLocal",
310 false);
311 private static boolean _started;
312
313 private static class CacheElement implements Serializable {
314
315 public CacheElement(Serializable key, Serializable value) {
316 _key = key;
317 _value = value;
318 }
319
320 public Serializable getKey() {
321 return _key;
322 }
323
324 public Serializable getValue() {
325 return _value;
326 }
327
328 private final Serializable _key;
329 private final Serializable _value;
330
331 }
332
333 private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
334
335 public ClusterLinkBootstrapLoaderServerThread(
336 ServerSocket serverSocket, String portalCacheManagerName,
337 List<String> portalCacheNames) {
338
339 _serverSocket = serverSocket;
340 _portalCacheManagerName = portalCacheManagerName;
341 _portalCacheNames = portalCacheNames;
342
343 setDaemon(true);
344 setName(
345 ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
346 portalCacheNames);
347 setPriority(Thread.NORM_PRIORITY);
348 }
349
350 @Override
351 public void run() {
352 Socket socket = null;
353
354 try {
355 try {
356 socket = _serverSocket.accept();
357 }
358 catch (SocketTimeoutException ste) {
359 if (_log.isDebugEnabled()) {
360 _log.debug(
361 "Terminating the socket thread " + getName() +
362 " that the client requested but never used");
363 }
364
365 return;
366 }
367 finally {
368 _serverSocket.close();
369 }
370
371 socket.shutdownInput();
372
373 try (OutputStream outputStream = socket.getOutputStream();
374 ObjectOutputStream objectOutputStream =
375 new AnnotatedObjectOutputStream(outputStream)) {
376
377 PortalCacheManager<? extends Serializable, ?>
378 portalCacheManager =
379 PortalCacheProvider.getPortalCacheManager(
380 _portalCacheManagerName);
381
382 for (String portalCacheName : _portalCacheNames) {
383 PortalCache<Serializable, Serializable> portalCache =
384 (PortalCache<Serializable, Serializable>)
385 portalCacheManager.getCache(portalCacheName);
386
387 if (portalCache == null) {
388 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
389
390 try {
391 portalCacheManager.getCache(portalCacheName);
392 }
393 finally {
394 _skipBootstrapLoaderThreadLocal.remove();
395 }
396
397 continue;
398 }
399
400 objectOutputStream.writeObject(portalCacheName);
401
402 List<Serializable> keys = portalCache.getKeys();
403
404 for (Serializable key : keys) {
405 Serializable value = portalCache.get(key);
406
407 CacheElement cacheElement = new CacheElement(
408 key, value);
409
410 objectOutputStream.writeObject(cacheElement);
411 }
412 }
413
414 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
415 }
416 }
417 catch (Exception e) {
418 throw new RuntimeException(e);
419 }
420 finally {
421 if (socket != null) {
422 try {
423 socket.close();
424 }
425 catch (IOException ioe) {
426 throw new RuntimeException(ioe);
427 }
428 }
429 }
430 }
431
432 private final String _portalCacheManagerName;
433 private final List<String> _portalCacheNames;
434 private final ServerSocket _serverSocket;
435
436 }
437
438 private static class SocketCacheServerSocketConfiguration
439 implements ServerSocketConfigurator {
440
441 @Override
442 public void configure(ServerSocket serverSocket)
443 throws SocketException {
444
445 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
446 }
447
448 }
449
450 }