001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.exception.SystemException;
025 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.kernel.util.SocketUtil;
032 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.IOException;
036 import java.io.ObjectInputStream;
037 import java.io.ObjectOutputStream;
038 import java.io.Serializable;
039
040 import java.net.ServerSocket;
041 import java.net.Socket;
042 import java.net.SocketAddress;
043 import java.net.SocketException;
044 import java.net.SocketTimeoutException;
045
046 import java.nio.channels.ServerSocketChannel;
047
048 import java.util.ArrayList;
049 import java.util.Arrays;
050 import java.util.List;
051 import java.util.concurrent.BlockingQueue;
052 import java.util.concurrent.TimeUnit;
053
054 import net.sf.ehcache.CacheManager;
055 import net.sf.ehcache.Ehcache;
056 import net.sf.ehcache.Element;
057
058
062 public class EhcacheStreamBootstrapHelpUtil {
063
064 public static SocketAddress createServerSocketFromCluster(
065 List<String> allCacheNames, List<String> newCacheNames)
066 throws Exception {
067
068 ServerSocketChannel serverSocketChannel =
069 SocketUtil.createServerSocketChannel(
070 ClusterLinkUtil.getBindInetAddress(),
071 PropsValues.EHCACHE_SOCKET_START_PORT,
072 _serverSocketConfigurator);
073
074 ServerSocket serverSocket = serverSocketChannel.socket();
075
076 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
077 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
078 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
079
080 CacheManager cacheManager =
081 ehcachePortalCacheManager.getEhcacheManager();
082
083 if (allCacheNames != null) {
084 newCacheNames.addAll(Arrays.asList(cacheManager.getCacheNames()));
085
086 newCacheNames.removeAll(allCacheNames);
087 }
088
089 EhcacheStreamServerThread ehcacheStreamServerThread =
090 new EhcacheStreamServerThread(
091 serverSocket, cacheManager, newCacheNames);
092
093 ehcacheStreamServerThread.start();
094
095 return serverSocket.getLocalSocketAddress();
096 }
097
098 protected static void loadCachesFromCluster(
099 boolean synchronizeCaches, Ehcache... newEhcaches)
100 throws Exception {
101
102 List<Address> clusterNodeAddresses =
103 ClusterExecutorUtil.getClusterNodeAddresses();
104
105 if (_log.isInfoEnabled()) {
106 _log.info("Cluster node addresses " + clusterNodeAddresses);
107 }
108
109 if (clusterNodeAddresses.size() <= 1) {
110 if (_log.isDebugEnabled()) {
111 _log.debug(
112 "Not loading cache from cluster because a cluster peer " +
113 "was not found");
114 }
115
116 return;
117 }
118
119 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
120 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
121 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
122
123 CacheManager cacheManager =
124 ehcachePortalCacheManager.getEhcacheManager();
125
126 List<String> allCacheNames = null;
127
128 if (synchronizeCaches) {
129 allCacheNames = Arrays.asList(cacheManager.getCacheNames());
130 }
131
132 List<String> newCacheNames = new ArrayList<String>();
133
134 for (Ehcache ehcache : newEhcaches) {
135 newCacheNames.add(ehcache.getName());
136 }
137
138 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
139 new MethodHandler(
140 _createServerSocketFromClusterMethodKey, allCacheNames,
141 newCacheNames),
142 true);
143
144 FutureClusterResponses futureClusterResponses =
145 ClusterExecutorUtil.execute(clusterRequest);
146
147 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
148 futureClusterResponses.getPartialResults();
149
150 ClusterNodeResponse clusterNodeResponse = null;
151
152 try {
153 clusterNodeResponse = clusterNodeResponses.poll(
154 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
155 TimeUnit.MILLISECONDS);
156 }
157 catch (InterruptedException ie) {
158 return;
159 }
160
161 if (clusterNodeResponse == null) {
162 if (_log.isWarnEnabled()) {
163 _log.warn(
164 "Unable to load cache from the cluster because there " +
165 "was no peer response");
166 }
167
168 return;
169 }
170
171 Socket socket = null;
172 ObjectInputStream objectInputStream = null;
173
174 try {
175 SocketAddress remoteSocketAddress =
176 (SocketAddress)clusterNodeResponse.getResult();
177
178 if (remoteSocketAddress == null) {
179 _log.error(
180 "Cluster peer " + clusterNodeResponse.getClusterNode() +
181 " responded with a null socket address");
182
183 return;
184 }
185
186 socket = new Socket();
187
188 socket.connect(remoteSocketAddress);
189
190 socket.shutdownOutput();
191
192 objectInputStream = new AnnotatedObjectInputStream(
193 socket.getInputStream());
194
195 Ehcache ehcache = null;
196
197 while (true) {
198 Object object = objectInputStream.readObject();
199
200 if (object instanceof EhcacheElement) {
201 EhcacheElement ehcacheElement = (EhcacheElement)object;
202
203 Element element = ehcacheElement.toElement();
204
205 ehcache.put(element, true);
206 }
207 else if (object instanceof String) {
208 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
209 break;
210 }
211
212 EhcacheStreamBootstrapCacheLoader.setSkip();
213
214 try {
215 ehcache = cacheManager.addCacheIfAbsent((String)object);
216 }
217 finally {
218 EhcacheStreamBootstrapCacheLoader.resetSkip();
219 }
220 }
221 else {
222 throw new SystemException(
223 "Socket input stream returned invalid object " +
224 object);
225 }
226 }
227 }
228 finally {
229 if (objectInputStream != null) {
230 objectInputStream.close();
231 }
232
233 if (socket != null) {
234 socket.close();
235 }
236 }
237 }
238
239 private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
240 "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
241
242 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
243
244 private static Log _log = LogFactoryUtil.getLog(
245 EhcacheStreamBootstrapHelpUtil.class);
246
247 private static MethodKey _createServerSocketFromClusterMethodKey =
248 new MethodKey(
249 EhcacheStreamBootstrapHelpUtil.class,
250 "createServerSocketFromCluster", List.class, List.class);
251 private static ServerSocketConfigurator _serverSocketConfigurator =
252 new SocketCacheServerSocketConfiguration();
253
254 private static class EhcacheElement implements Serializable {
255
256 public EhcacheElement(Serializable key, Serializable value) {
257 _key = key;
258 _value = value;
259 }
260
261 public Element toElement() {
262 return new Element(_key, _value);
263 }
264
265 private Serializable _key;
266 private Serializable _value;
267
268 }
269
270 private static class EhcacheStreamServerThread extends Thread {
271
272 public EhcacheStreamServerThread(
273 ServerSocket serverSocket, CacheManager cacheManager,
274 List<String> cacheNames) {
275
276 _serverSocket = serverSocket;
277 _cacheManager = cacheManager;
278 _cacheNames = cacheNames;
279
280 setDaemon(true);
281 setName(
282 EhcacheStreamServerThread.class.getName() + " - " + cacheNames);
283 setPriority(Thread.NORM_PRIORITY);
284 }
285
286 @Override
287 public void run() {
288 Socket socket = null;
289
290 try {
291 try {
292 socket = _serverSocket.accept();
293 }
294 catch (SocketTimeoutException ste) {
295 if (_log.isDebugEnabled()) {
296 _log.debug(
297 "Terminating the socket thread " + getName() +
298 " that the client requested but never used");
299 }
300
301 return;
302 }
303
304 _serverSocket.close();
305
306 socket.shutdownInput();
307
308 ObjectOutputStream objectOutputStream =
309 new AnnotatedObjectOutputStream(socket.getOutputStream());
310
311 for (String cacheName : _cacheNames) {
312 Ehcache ehcache = _cacheManager.getCache(cacheName);
313
314 if (ehcache == null) {
315 EhcacheStreamBootstrapCacheLoader.setSkip();
316
317 try {
318 _cacheManager.addCache(cacheName);
319 }
320 finally {
321 EhcacheStreamBootstrapCacheLoader.resetSkip();
322 }
323
324 continue;
325 }
326
327 objectOutputStream.writeObject(cacheName);
328
329 List<Object> keys = ehcache.getKeys();
330
331 for (Object key : keys) {
332 if (!(key instanceof Serializable)) {
333 if (_log.isWarnEnabled()) {
334 _log.warn(
335 "Key " + key + " is not serializable");
336 }
337
338 continue;
339 }
340
341 Element element = ehcache.get(key);
342
343 if (element == null) {
344 continue;
345 }
346
347 Object value = element.getObjectValue();
348
349 if (!(value instanceof Serializable)) {
350 if (_log.isWarnEnabled() && (value != null)) {
351 _log.warn(
352 "Value " + value + " is not serializable");
353 }
354
355 continue;
356 }
357
358 EhcacheElement ehcacheElement = new EhcacheElement(
359 (Serializable)key, (Serializable)value);
360
361 objectOutputStream.writeObject(ehcacheElement);
362 }
363 }
364
365 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
366
367 objectOutputStream.close();
368 }
369 catch (Exception e) {
370 throw new RuntimeException(e);
371 }
372 finally {
373 if (socket != null) {
374 try {
375 socket.close();
376 }
377 catch (IOException ioe) {
378 throw new RuntimeException(ioe);
379 }
380 }
381 }
382 }
383
384 private CacheManager _cacheManager;
385 private List<String> _cacheNames;
386 private ServerSocket _serverSocket;
387
388 }
389
390 private static class SocketCacheServerSocketConfiguration
391 implements ServerSocketConfigurator {
392
393 @Override
394 public void configure(ServerSocket serverSocket)
395 throws SocketException {
396
397 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
398 }
399
400 }
401
402 }