001
014
015 package com.liferay.portal.cache.bootstrap;
016
017 import com.liferay.portal.kernel.cache.PortalCache;
018 import com.liferay.portal.kernel.cache.PortalCacheHelperUtil;
019 import com.liferay.portal.kernel.cache.PortalCacheManager;
020 import com.liferay.portal.kernel.cache.PortalCacheProvider;
021 import com.liferay.portal.kernel.cluster.Address;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
029 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
030 import com.liferay.portal.kernel.log.Log;
031 import com.liferay.portal.kernel.log.LogFactoryUtil;
032 import com.liferay.portal.kernel.util.InitialThreadLocal;
033 import com.liferay.portal.kernel.util.MethodHandler;
034 import com.liferay.portal.kernel.util.MethodKey;
035 import com.liferay.portal.kernel.util.SocketUtil;
036 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
037 import com.liferay.portal.util.PropsValues;
038
039 import java.io.IOException;
040 import java.io.ObjectInputStream;
041 import java.io.ObjectOutputStream;
042 import java.io.OutputStream;
043 import java.io.Serializable;
044
045 import java.net.ServerSocket;
046 import java.net.Socket;
047 import java.net.SocketAddress;
048 import java.net.SocketException;
049 import java.net.SocketTimeoutException;
050
051 import java.nio.channels.ServerSocketChannel;
052
053 import java.util.ArrayList;
054 import java.util.Arrays;
055 import java.util.HashMap;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.concurrent.BlockingQueue;
059 import java.util.concurrent.TimeUnit;
060
061
065 public class ClusterLinkBootstrapLoaderHelperUtil {
066
067 public static SocketAddress createServerSocketFromCluster(
068 String portalCacheManagerName, List<String> portalCacheNames)
069 throws Exception {
070
071 ServerSocketChannel serverSocketChannel =
072 SocketUtil.createServerSocketChannel(
073 ClusterLinkUtil.getBindInetAddress(),
074 PropsValues.EHCACHE_SOCKET_START_PORT,
075 _serverSocketConfigurator);
076
077 ServerSocket serverSocket = serverSocketChannel.socket();
078
079 ClusterLinkBootstrapLoaderServerThread
080 clusterLinkBootstrapLoaderServerThread =
081 new ClusterLinkBootstrapLoaderServerThread(
082 serverSocket, portalCacheManagerName, portalCacheNames);
083
084 clusterLinkBootstrapLoaderServerThread.start();
085
086 return serverSocket.getLocalSocketAddress();
087 }
088
089 public static synchronized void start() {
090 if (!_started) {
091 _started = true;
092 }
093
094 if (_deferredPortalCaches.isEmpty()) {
095 return;
096 }
097
098 if (_log.isDebugEnabled()) {
099 _log.debug("Loading deferred caches");
100 }
101
102 try {
103 for (Map.Entry<String, List<String>> entry :
104 _deferredPortalCaches.entrySet()) {
105
106 List<String> portalCacheNames = entry.getValue();
107
108 if (portalCacheNames.isEmpty()) {
109 continue;
110 }
111
112 loadCachesFromCluster(
113 entry.getKey(),
114 portalCacheNames.toArray(
115 new String[portalCacheNames.size()]));
116 }
117 }
118 catch (Exception e) {
119 if (_log.isWarnEnabled()) {
120 _log.warn("Unable to load cache data from the cluster", e);
121 }
122 }
123 finally {
124 _deferredPortalCaches.clear();
125 }
126 }
127
128 protected static boolean isSkipped() {
129 return _skipBootstrapLoaderThreadLocal.get();
130 }
131
132 protected static void loadCachesFromCluster(
133 String portalCacheManagerName, String ... portalCacheNames)
134 throws Exception {
135
136 synchronized (ClusterLinkBootstrapLoaderHelperUtil.class) {
137 if (!_started) {
138 List<String> portalCaches = _deferredPortalCaches.get(
139 portalCacheManagerName);
140
141 if (portalCaches == null) {
142 portalCaches = new ArrayList<String>();
143
144 _deferredPortalCaches.put(
145 portalCacheManagerName, portalCaches);
146 }
147
148 portalCaches.addAll(Arrays.asList(portalCacheNames));
149
150 return;
151 }
152 }
153
154 List<Address> clusterNodeAddresses =
155 ClusterExecutorUtil.getClusterNodeAddresses();
156
157 if (_log.isInfoEnabled()) {
158 _log.info("Cluster node addresses " + clusterNodeAddresses);
159 }
160
161 if (clusterNodeAddresses.size() <= 1) {
162 if (_log.isDebugEnabled()) {
163 _log.debug(
164 "Not loading cache from cluster because a cluster peer " +
165 "was not found");
166 }
167
168 return;
169 }
170
171 PortalCacheManager<? extends Serializable, ?> portalCacheManager =
172 PortalCacheProvider.getPortalCacheManager(portalCacheManagerName);
173
174 if (!portalCacheManager.isClusterAware()) {
175 return;
176 }
177
178 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
179 new MethodHandler(
180 _createServerSocketFromClusterMethodKey, portalCacheManagerName,
181 Arrays.asList(portalCacheNames)),
182 true);
183
184 FutureClusterResponses futureClusterResponses =
185 ClusterExecutorUtil.execute(clusterRequest);
186
187 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
188 futureClusterResponses.getPartialResults();
189
190 ClusterNodeResponse clusterNodeResponse = null;
191
192 try {
193 clusterNodeResponse = clusterNodeResponses.poll(
194 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
195 TimeUnit.MILLISECONDS);
196 }
197 catch (InterruptedException ie) {
198 return;
199 }
200
201 if (clusterNodeResponse == null) {
202 if (_log.isWarnEnabled()) {
203 _log.warn(
204 "Unable to load cache from the cluster because there " +
205 "was no peer response");
206 }
207
208 return;
209 }
210
211 if (_log.isInfoEnabled()) {
212 _log.info(
213 "Load cache data from cluster node " +
214 clusterNodeResponse.getClusterNode());
215 }
216
217 Socket socket = null;
218
219 try {
220 SocketAddress remoteSocketAddress =
221 (SocketAddress)clusterNodeResponse.getResult();
222
223 if (remoteSocketAddress == null) {
224 _log.error(
225 "Cluster peer " + clusterNodeResponse.getClusterNode() +
226 " responded with a null socket address");
227
228 return;
229 }
230
231 socket = new Socket();
232
233 socket.connect(remoteSocketAddress);
234
235 socket.shutdownOutput();
236
237 ObjectInputStream objectInputStream =
238 new AnnotatedObjectInputStream(socket.getInputStream());
239
240 PortalCache<Serializable, Serializable> portalCache = null;
241
242 try {
243 while (true) {
244 Object object = objectInputStream.readObject();
245
246 if (object instanceof CacheElement) {
247 CacheElement cacheElement = (CacheElement)object;
248
249 PortalCacheHelperUtil.putWithoutReplicator(
250 portalCache, cacheElement.getKey(),
251 cacheElement.getValue());
252 }
253 else if (object instanceof String) {
254 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
255 break;
256 }
257
258 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
259
260 try {
261 portalCache =
262 (PortalCache<Serializable, Serializable>)
263 portalCacheManager.getCache((String)object);
264 }
265 finally {
266 _skipBootstrapLoaderThreadLocal.remove();
267 }
268 }
269 else {
270 throw new SystemException(
271 "Socket input stream returned invalid object " +
272 object);
273 }
274 }
275 }
276 finally {
277 if (objectInputStream != null) {
278 objectInputStream.close();
279 }
280 }
281 }
282 catch (Exception e) {
283 throw new Exception(
284 "Unable to load cache data from cluster node " +
285 clusterNodeResponse.getClusterNode(),
286 e);
287 }
288 finally {
289 if (socket != null) {
290 socket.close();
291 }
292 }
293 }
294
295 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
296
297 private static final Log _log = LogFactoryUtil.getLog(
298 ClusterLinkBootstrapLoaderHelperUtil.class);
299
300 private static final MethodKey _createServerSocketFromClusterMethodKey =
301 new MethodKey(
302 ClusterLinkBootstrapLoaderHelperUtil.class,
303 "createServerSocketFromCluster", String.class, List.class);
304 private static final Map<String, List<String>> _deferredPortalCaches =
305 new HashMap<String, List<String>>();
306 private static final ServerSocketConfigurator _serverSocketConfigurator =
307 new SocketCacheServerSocketConfiguration();
308 private static final ThreadLocal<Boolean> _skipBootstrapLoaderThreadLocal =
309 new InitialThreadLocal<Boolean>(
310 ClusterLinkBootstrapLoaderHelperUtil.class +
311 "._skipBootstrapLoaderThreadLocal",
312 false);
313 private static boolean _started;
314
315 private static class CacheElement implements Serializable {
316
317 public CacheElement(Serializable key, Serializable value) {
318 _key = key;
319 _value = value;
320 }
321
322 public Serializable getKey() {
323 return _key;
324 }
325
326 public Serializable getValue() {
327 return _value;
328 }
329
330 private final Serializable _key;
331 private final Serializable _value;
332
333 }
334
335 private static class ClusterLinkBootstrapLoaderServerThread extends Thread {
336
337 public ClusterLinkBootstrapLoaderServerThread(
338 ServerSocket serverSocket, String portalCacheManagerName,
339 List<String> portalCacheNames) {
340
341 _serverSocket = serverSocket;
342 _portalCacheManagerName = portalCacheManagerName;
343 _portalCacheNames = portalCacheNames;
344
345 setDaemon(true);
346 setName(
347 ClusterLinkBootstrapLoaderServerThread.class.getName() + " - " +
348 portalCacheNames);
349 setPriority(Thread.NORM_PRIORITY);
350 }
351
352 @Override
353 public void run() {
354 Socket socket = null;
355
356 try {
357 try {
358 socket = _serverSocket.accept();
359 }
360 catch (SocketTimeoutException ste) {
361 if (_log.isDebugEnabled()) {
362 _log.debug(
363 "Terminating the socket thread " + getName() +
364 " that the client requested but never used");
365 }
366
367 return;
368 }
369 finally {
370 _serverSocket.close();
371 }
372
373 socket.shutdownInput();
374
375 try (OutputStream outputStream = socket.getOutputStream();
376 ObjectOutputStream objectOutputStream =
377 new AnnotatedObjectOutputStream(outputStream)) {
378
379 PortalCacheManager<? extends Serializable, ?>
380 portalCacheManager =
381 PortalCacheProvider.getPortalCacheManager(
382 _portalCacheManagerName);
383
384 for (String portalCacheName : _portalCacheNames) {
385 PortalCache<Serializable, Serializable> portalCache =
386 (PortalCache<Serializable, Serializable>)
387 portalCacheManager.getCache(portalCacheName);
388
389 if (portalCache == null) {
390 _skipBootstrapLoaderThreadLocal.set(Boolean.TRUE);
391
392 try {
393 portalCacheManager.getCache(portalCacheName);
394 }
395 finally {
396 _skipBootstrapLoaderThreadLocal.remove();
397 }
398
399 continue;
400 }
401
402 objectOutputStream.writeObject(portalCacheName);
403
404 List<Serializable> keys = portalCache.getKeys();
405
406 for (Serializable key : keys) {
407 Serializable value = portalCache.get(key);
408
409 CacheElement cacheElement = new CacheElement(
410 key, value);
411
412 objectOutputStream.writeObject(cacheElement);
413 }
414 }
415
416 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
417 }
418 }
419 catch (Exception e) {
420 throw new RuntimeException(e);
421 }
422 finally {
423 if (socket != null) {
424 try {
425 socket.close();
426 }
427 catch (IOException ioe) {
428 throw new RuntimeException(ioe);
429 }
430 }
431 }
432 }
433
434 private final String _portalCacheManagerName;
435 private final List<String> _portalCacheNames;
436 private final ServerSocket _serverSocket;
437
438 }
439
440 private static class SocketCacheServerSocketConfiguration
441 implements ServerSocketConfigurator {
442
443 @Override
444 public void configure(ServerSocket serverSocket)
445 throws SocketException {
446
447 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
448 }
449
450 }
451
452 }