001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.exception.SystemException;
025 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.kernel.util.SocketUtil;
032 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.IOException;
036 import java.io.ObjectInputStream;
037 import java.io.ObjectOutputStream;
038 import java.io.Serializable;
039
040 import java.net.ServerSocket;
041 import java.net.Socket;
042 import java.net.SocketAddress;
043 import java.net.SocketException;
044 import java.net.SocketTimeoutException;
045
046 import java.nio.channels.ServerSocketChannel;
047
048 import java.util.ArrayList;
049 import java.util.List;
050 import java.util.concurrent.BlockingQueue;
051 import java.util.concurrent.TimeUnit;
052
053 import net.sf.ehcache.CacheManager;
054 import net.sf.ehcache.Ehcache;
055 import net.sf.ehcache.Element;
056
057
061 public class EhcacheStreamBootstrapHelpUtil {
062
063 public static SocketAddress createServerSocketFromCluster(
064 List<String> cacheNames)
065 throws Exception {
066
067 ServerSocketChannel serverSocketChannel =
068 SocketUtil.createServerSocketChannel(
069 ClusterLinkUtil.getBindInetAddress(),
070 PropsValues.EHCACHE_SOCKET_START_PORT,
071 _serverSocketConfigurator);
072
073 ServerSocket serverSocket = serverSocketChannel.socket();
074
075 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
076 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
077 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
078
079 CacheManager cacheManager =
080 ehcachePortalCacheManager.getEhcacheManager();
081
082 EhcacheStreamServerThread ehcacheStreamServerThread =
083 new EhcacheStreamServerThread(
084 serverSocket, cacheManager, cacheNames);
085
086 ehcacheStreamServerThread.start();
087
088 return serverSocket.getLocalSocketAddress();
089 }
090
091 protected static void loadCachesFromCluster(Ehcache... ehcaches)
092 throws Exception {
093
094 List<Address> clusterNodeAddresses =
095 ClusterExecutorUtil.getClusterNodeAddresses();
096
097 if (_log.isInfoEnabled()) {
098 _log.info("Cluster node addresses " + clusterNodeAddresses);
099 }
100
101 if (clusterNodeAddresses.size() <= 1) {
102 if (_log.isDebugEnabled()) {
103 _log.debug(
104 "Not loading cache from cluster because a cluster peer " +
105 "was not found");
106 }
107
108 return;
109 }
110
111 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
112 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
113 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
114
115 CacheManager cacheManager =
116 ehcachePortalCacheManager.getEhcacheManager();
117
118 List<String> cacheNames = new ArrayList<String>();
119
120 for (Ehcache ehcache : ehcaches) {
121 cacheNames.add(ehcache.getName());
122 }
123
124 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
125 new MethodHandler(
126 _createServerSocketFromClusterMethodKey, cacheNames),
127 true);
128
129 FutureClusterResponses futureClusterResponses =
130 ClusterExecutorUtil.execute(clusterRequest);
131
132 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
133 futureClusterResponses.getPartialResults();
134
135 ClusterNodeResponse clusterNodeResponse = null;
136
137 try {
138 clusterNodeResponse = clusterNodeResponses.poll(
139 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
140 TimeUnit.MILLISECONDS);
141 }
142 catch (InterruptedException ie) {
143 return;
144 }
145
146 if (clusterNodeResponse == null) {
147 if (_log.isWarnEnabled()) {
148 _log.warn(
149 "Unable to load cache from the cluster because there " +
150 "was no peer response");
151 }
152
153 return;
154 }
155
156 Socket socket = null;
157 ObjectInputStream objectInputStream = null;
158
159 try {
160 SocketAddress remoteSocketAddress =
161 (SocketAddress)clusterNodeResponse.getResult();
162
163 if (remoteSocketAddress == null) {
164 _log.error(
165 "Cluster peer " + clusterNodeResponse.getClusterNode() +
166 " responded with a null socket address");
167
168 return;
169 }
170
171 socket = new Socket();
172
173 socket.connect(remoteSocketAddress);
174
175 socket.shutdownOutput();
176
177 objectInputStream = new AnnotatedObjectInputStream(
178 socket.getInputStream());
179
180 Ehcache ehcache = null;
181
182 while (true) {
183 Object object = objectInputStream.readObject();
184
185 if (object instanceof EhcacheElement) {
186 EhcacheElement ehcacheElement = (EhcacheElement)object;
187
188 Element element = ehcacheElement.toElement();
189
190 ehcache.put(element, true);
191 }
192 else if (object instanceof String) {
193 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
194 break;
195 }
196
197 EhcacheStreamBootstrapCacheLoader.setSkip();
198
199 try {
200 ehcache = cacheManager.addCacheIfAbsent((String)object);
201 }
202 finally {
203 EhcacheStreamBootstrapCacheLoader.resetSkip();
204 }
205 }
206 else {
207 throw new SystemException(
208 "Socket input stream returned invalid object " +
209 object);
210 }
211 }
212 }
213 finally {
214 if (objectInputStream != null) {
215 objectInputStream.close();
216 }
217
218 if (socket != null) {
219 socket.close();
220 }
221 }
222 }
223
224 private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
225 "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
226
227 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
228
229 private static Log _log = LogFactoryUtil.getLog(
230 EhcacheStreamBootstrapHelpUtil.class);
231
232 private static MethodKey _createServerSocketFromClusterMethodKey =
233 new MethodKey(
234 EhcacheStreamBootstrapHelpUtil.class,
235 "createServerSocketFromCluster", List.class);
236 private static ServerSocketConfigurator _serverSocketConfigurator =
237 new SocketCacheServerSocketConfiguration();
238
239 private static class EhcacheElement implements Serializable {
240
241 public EhcacheElement(Serializable key, Serializable value) {
242 _key = key;
243 _value = value;
244 }
245
246 public Element toElement() {
247 return new Element(_key, _value);
248 }
249
250 private Serializable _key;
251 private Serializable _value;
252
253 }
254
255 private static class EhcacheStreamServerThread extends Thread {
256
257 public EhcacheStreamServerThread(
258 ServerSocket serverSocket, CacheManager cacheManager,
259 List<String> cacheNames) {
260
261 _serverSocket = serverSocket;
262 _cacheManager = cacheManager;
263 _cacheNames = cacheNames;
264
265 setDaemon(true);
266 setName(
267 EhcacheStreamServerThread.class.getName() + " - " + cacheNames);
268 setPriority(Thread.NORM_PRIORITY);
269 }
270
271 @Override
272 public void run() {
273 Socket socket = null;
274
275 try {
276 try {
277 socket = _serverSocket.accept();
278 }
279 catch (SocketTimeoutException ste) {
280 if (_log.isDebugEnabled()) {
281 _log.debug(
282 "Terminating the socket thread " + getName() +
283 " that the client requested but never used");
284 }
285
286 return;
287 }
288
289 _serverSocket.close();
290
291 socket.shutdownInput();
292
293 ObjectOutputStream objectOutputStream =
294 new AnnotatedObjectOutputStream(socket.getOutputStream());
295
296 for (String cacheName : _cacheNames) {
297 Ehcache ehcache = _cacheManager.getCache(cacheName);
298
299 if (ehcache == null) {
300 EhcacheStreamBootstrapCacheLoader.setSkip();
301
302 try {
303 _cacheManager.addCache(cacheName);
304 }
305 finally {
306 EhcacheStreamBootstrapCacheLoader.resetSkip();
307 }
308
309 continue;
310 }
311
312 objectOutputStream.writeObject(cacheName);
313
314 List<Object> keys = ehcache.getKeys();
315
316 for (Object key : keys) {
317 if (!(key instanceof Serializable)) {
318 if (_log.isWarnEnabled()) {
319 _log.warn(
320 "Key " + key + " is not serializable");
321 }
322
323 continue;
324 }
325
326 Element element = ehcache.get(key);
327
328 if (element == null) {
329 continue;
330 }
331
332 Object value = element.getObjectValue();
333
334 if (!(value instanceof Serializable)) {
335 if (_log.isWarnEnabled() && (value != null)) {
336 _log.warn(
337 "Value " + value + " is not serializable");
338 }
339
340 continue;
341 }
342
343 EhcacheElement ehcacheElement = new EhcacheElement(
344 (Serializable)key, (Serializable)value);
345
346 objectOutputStream.writeObject(ehcacheElement);
347 }
348 }
349
350 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
351
352 objectOutputStream.close();
353 }
354 catch (Exception e) {
355 throw new RuntimeException(e);
356 }
357 finally {
358 if (socket != null) {
359 try {
360 socket.close();
361 }
362 catch (IOException ioe) {
363 throw new RuntimeException(ioe);
364 }
365 }
366 }
367 }
368
369 private CacheManager _cacheManager;
370 private List<String> _cacheNames;
371 private ServerSocket _serverSocket;
372
373 }
374
375 private static class SocketCacheServerSocketConfiguration
376 implements ServerSocketConfigurator {
377
378 @Override
379 public void configure(ServerSocket serverSocket)
380 throws SocketException {
381
382 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
383 }
384
385 }
386
387 }