001
014
015 package com.liferay.portal.fabric.netty.client;
016
017 import com.liferay.portal.fabric.client.FabricClient;
018 import com.liferay.portal.fabric.local.agent.LocalFabricAgent;
019 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
020 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
021 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
022 import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
023 import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
024 import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
025 import com.liferay.portal.fabric.netty.handlers.NettyFabricWorkerExecutionChannelHandler;
026 import com.liferay.portal.fabric.netty.repository.NettyRepository;
027 import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
028 import com.liferay.portal.fabric.netty.util.NettyUtil;
029 import com.liferay.portal.fabric.repository.Repository;
030 import com.liferay.portal.fabric.worker.FabricWorker;
031 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
032 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.process.ProcessCallable;
036 import com.liferay.portal.kernel.process.ProcessExecutor;
037 import com.liferay.portal.kernel.process.TerminationProcessException;
038 import com.liferay.portal.kernel.util.NamedThreadFactory;
039
040 import io.netty.bootstrap.Bootstrap;
041 import io.netty.channel.Channel;
042 import io.netty.channel.ChannelFuture;
043 import io.netty.channel.ChannelFutureListener;
044 import io.netty.channel.ChannelInitializer;
045 import io.netty.channel.ChannelPipeline;
046 import io.netty.channel.EventLoopGroup;
047 import io.netty.channel.nio.NioEventLoopGroup;
048 import io.netty.channel.socket.SocketChannel;
049 import io.netty.channel.socket.nio.NioSocketChannel;
050 import io.netty.util.concurrent.DefaultEventExecutorGroup;
051 import io.netty.util.concurrent.EventExecutorGroup;
052 import io.netty.util.concurrent.Future;
053 import io.netty.util.concurrent.FutureListener;
054
055 import java.io.IOException;
056 import java.io.Serializable;
057
058 import java.lang.Thread.State;
059
060 import java.nio.file.Files;
061 import java.nio.file.Path;
062
063 import java.util.Map;
064 import java.util.concurrent.ExecutionException;
065 import java.util.concurrent.TimeUnit;
066 import java.util.concurrent.TimeoutException;
067 import java.util.concurrent.atomic.AtomicInteger;
068
069
072 public class NettyFabricClient implements FabricClient {
073
074 public NettyFabricClient(
075 ProcessExecutor processExecutor,
076 NettyFabricClientConfig nettyFabricClientConfig,
077 NettyFabricClientShutdownCallback nettyFabricClientShutdownCallback) {
078
079 _processExecutor = processExecutor;
080 _nettyFabricClientConfig = nettyFabricClientConfig;
081 _nettyFabricClientShutdownCallback = nettyFabricClientShutdownCallback;
082 }
083
084 @Override
085 public synchronized void connect() {
086 if (_channel != null) {
087 throw new IllegalStateException(
088 "Netty fabric client was already started");
089 }
090
091 if (_log.isInfoEnabled()) {
092 _log.info(
093 "Starting Netty fabric client using " +
094 _nettyFabricClientConfig);
095 }
096
097 Runtime runtime = Runtime.getRuntime();
098
099 runtime.addShutdownHook(_shutdownThread);
100
101 _bootstrap = new Bootstrap();
102
103 _bootstrap.channel(NioSocketChannel.class);
104 _bootstrap.group(
105 new NioEventLoopGroup(
106 _nettyFabricClientConfig.getEventLoopGroupThreadCount(),
107 new NamedThreadFactory(
108 "Netty Fabric Client/NIO Event Loop Group",
109 Thread.NORM_PRIORITY, null)));
110 _bootstrap.handler(new NettyFabricClientChannelInitializer());
111
112 int reconnectCount = _nettyFabricClientConfig.getReconnectCount();
113
114 if (reconnectCount < 0) {
115 reconnectCount = Integer.MAX_VALUE;
116 }
117
118 _reconnectCounter.set(reconnectCount);
119
120 doConnect();
121 }
122
123 @Override
124 public synchronized java.util.concurrent.Future<?> disconnect() {
125 if (_channel == null) {
126 throw new IllegalStateException(
127 "Netty fabric client is not started");
128 }
129
130 _reconnectCounter.set(0);
131
132 _channel.close();
133
134 EventExecutorGroup eventExecutorGroup = _bootstrap.group();
135
136 Future<?> future = eventExecutorGroup.terminationFuture();
137
138 final DefaultNoticeableFuture<?> defaultNoticeableFuture =
139 new DefaultNoticeableFuture<>();
140
141 future.addListener(
142 new FutureListener<Object>() {
143
144 @Override
145 public void operationComplete(Future<Object> future) {
146 defaultNoticeableFuture.run();
147 }
148
149 });
150
151 return defaultNoticeableFuture;
152 }
153
154 protected EventExecutorGroup createEventExecutorGroup(
155 int threadCount, String threadPoolName) {
156
157 EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(
158 threadCount,
159 new NamedThreadFactory(threadPoolName, Thread.NORM_PRIORITY, null));
160
161 NettyUtil.bindShutdown(
162 _bootstrap.group(), eventExecutorGroup,
163 _nettyFabricClientConfig.getShutdownQuietPeriod(),
164 _nettyFabricClientConfig.getShutdownTimeout());
165
166 return eventExecutorGroup;
167 }
168
169 protected void doConnect() {
170 ChannelFuture channelFuture = _bootstrap.connect(
171 _nettyFabricClientConfig.getNettyFabricServerHost(),
172 _nettyFabricClientConfig.getNettyFabricServerPort());
173
174 _channel = channelFuture.channel();
175
176 channelFuture.addListener(new PostConnectChannelFutureListener());
177 }
178
179 protected void terminateFabricWorkers(Channel channel) {
180 Map<Long, FabricWorker<?>> fabricWorkers =
181 NettyChannelAttributes.getFabricWorkers(channel);
182
183 if (fabricWorkers == null) {
184 return;
185 }
186
187 for (Map.Entry<Long, FabricWorker<?>> entry :
188 fabricWorkers.entrySet()) {
189
190 FabricWorker<?> fabricWorker = entry.getValue();
191
192 fabricWorker.write(_runtimeExitProcessCallable);
193
194 NoticeableFuture<?> noticeableFuture =
195 fabricWorker.getProcessNoticeableFuture();
196
197 try {
198 try {
199 noticeableFuture.get(
200 _nettyFabricClientConfig.getExecutionTimeout(),
201 TimeUnit.MILLISECONDS);
202 }
203 catch (TimeoutException te) {
204 fabricWorker.write(_runtimeHaltProcessCallable);
205
206 noticeableFuture.get(
207 _nettyFabricClientConfig.getExecutionTimeout(),
208 TimeUnit.MILLISECONDS);
209 }
210 }
211 catch (Throwable t) {
212 if (t instanceof ExecutionException) {
213 Throwable cause = t.getCause();
214
215 if (cause instanceof TerminationProcessException) {
216 TerminationProcessException tpe =
217 (TerminationProcessException)cause;
218
219 if (_log.isWarnEnabled()) {
220 _log.warn(
221 "Forcibly terminate fabric worker " +
222 entry.getKey() + " with exit code " +
223 tpe.getExitCode());
224 }
225
226 continue;
227 }
228 }
229
230 _log.error(
231 "Unable to terminate fabric worker " + entry.getKey(), t);
232 }
233 }
234 }
235
236 protected class NettyFabricClientChannelInitializer
237 extends ChannelInitializer<SocketChannel> {
238
239 @Override
240 protected void initChannel(SocketChannel socketChannel)
241 throws IOException {
242
243 Path repositoryPath = _nettyFabricClientConfig.getRepositoryPath();
244
245 Files.createDirectories(repositoryPath);
246
247 Repository<Channel> repository = new NettyRepository(
248 repositoryPath,
249 _nettyFabricClientConfig.getRepositoryGetFileTimeout());
250
251 ChannelFuture channelFuture = socketChannel.closeFuture();
252
253 channelFuture.addListener(
254 new PostDisconnectChannelFutureListener(repository));
255
256 ChannelPipeline channelPipeline = socketChannel.pipeline();
257
258 channelPipeline.addLast(
259 AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
260 channelPipeline.addLast(
261 AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
262
263 EventExecutorGroup fileServerEventExecutorGroup =
264 createEventExecutorGroup(
265 _nettyFabricClientConfig.getFileServerGroupThreadCount(),
266 "Netty Fabric Client/File Server Event Executor Group");
267
268 channelPipeline.addLast(
269 fileServerEventExecutorGroup, FileRequestChannelHandler.NAME,
270 new FileRequestChannelHandler(
271 _nettyFabricClientConfig.
272 getFileServerFolderCompressionLevel()));
273 channelPipeline.addLast(
274 new FileResponseChannelHandler(
275 repository.getAsyncBroker(), fileServerEventExecutorGroup));
276 channelPipeline.addLast(
277 createEventExecutorGroup(
278 _nettyFabricClientConfig.getRPCGroupThreadCount(),
279 "Netty Fabric Client/RPC Event Executor Group"),
280 NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
281 channelPipeline.addLast(
282 createEventExecutorGroup(
283 _nettyFabricClientConfig.getExecutionGroupThreadCount(),
284 "Netty Fabric Client/Execution Event Executor Group"),
285 new NettyFabricWorkerExecutionChannelHandler(
286 repository, new LocalFabricAgent(_processExecutor),
287 _nettyFabricClientConfig.getExecutionTimeout()));
288 }
289
290 }
291
292 protected class PostConnectChannelFutureListener
293 implements ChannelFutureListener {
294
295 @Override
296 public void operationComplete(ChannelFuture channelFuture) {
297 Channel channel = channelFuture.channel();
298
299 if (channelFuture.isSuccess()) {
300 if (_log.isInfoEnabled()) {
301 _log.info("Connected to " + channel.remoteAddress());
302 }
303
304 Path repositoryPath =
305 _nettyFabricClientConfig.getRepositoryPath();
306
307 ChannelFuture registerChannelFuture = _channel.writeAndFlush(
308 new NettyFabricAgentConfig(repositoryPath.toFile()));
309
310 registerChannelFuture.addListener(
311 new PostRegisterChannelFutureListener());
312
313 return;
314 }
315
316 String serverAddress =
317 _nettyFabricClientConfig.getNettyFabricServerHost() + ":" +
318 _nettyFabricClientConfig.getNettyFabricServerPort();
319
320 if (channelFuture.isCancelled()) {
321 _log.error("Cancelled connecting to " + serverAddress);
322 }
323 else {
324 _log.error(
325 "Unable to connect to " + serverAddress,
326 channelFuture.cause());
327 }
328 }
329
330 }
331
332 protected class PostDisconnectChannelFutureListener
333 implements ChannelFutureListener {
334
335 @Override
336 public void operationComplete(ChannelFuture channelFuture) {
337 terminateFabricWorkers(_channel);
338
339 repository.dispose(true);
340
341 EventLoopGroup eventLoopGroup = _bootstrap.group();
342
343 if (_reconnectCounter.getAndDecrement() > 0) {
344 eventLoopGroup.schedule(
345 new Runnable() {
346
347 @Override
348 public void run() {
349 doConnect();
350 }
351
352 },
353 _nettyFabricClientConfig.getReconnectInterval(),
354 TimeUnit.MILLISECONDS);
355
356 if (_log.isInfoEnabled()) {
357 _log.info(
358 "Try to reconnect " +
359 _nettyFabricClientConfig.getReconnectInterval() +
360 " ms later");
361 }
362 }
363 else {
364 if (_log.isInfoEnabled()) {
365 _log.info(
366 "Shutting down Netty fabric client on " + _channel);
367 }
368
369 Future<?> future = eventLoopGroup.shutdownGracefully(
370 _nettyFabricClientConfig.getShutdownQuietPeriod(),
371 _nettyFabricClientConfig.getShutdownTimeout(),
372 TimeUnit.MILLISECONDS);
373
374 future.addListener(new PostShutdownChannelFutureListener());
375 }
376 }
377
378 protected PostDisconnectChannelFutureListener(
379 Repository<Channel> repository) {
380
381 this.repository = repository;
382 }
383
384 protected final Repository<Channel> repository;
385
386 }
387
388 protected class PostRegisterChannelFutureListener
389 implements ChannelFutureListener {
390
391 @Override
392 public void operationComplete(ChannelFuture channelFuture) {
393 if (channelFuture.isSuccess()) {
394 int reconnectCount =
395 _nettyFabricClientConfig.getReconnectCount();
396
397 if (reconnectCount < 0) {
398 reconnectCount = Integer.MAX_VALUE;
399 }
400
401 _reconnectCounter.set(reconnectCount);
402
403 if (_log.isInfoEnabled()) {
404 _log.info("Registered Netty fabric agent on " + _channel);
405 }
406
407 return;
408 }
409
410 _log.error("Unable to register Netty fabric agent on " + _channel);
411
412 _channel.close();
413 }
414
415 }
416
417 protected class PostShutdownChannelFutureListener
418 implements FutureListener<Object> {
419
420 @Override
421 public void operationComplete(Future<Object> future) {
422 _channel = null;
423 _bootstrap = null;
424
425 _nettyFabricClientShutdownCallback.shutdown();
426
427 if (_shutdownThread.getState() == State.NEW) {
428 Runtime runtime = Runtime.getRuntime();
429
430 runtime.removeShutdownHook(_shutdownThread);
431 }
432 }
433
434 }
435
436 private static final int _FABRIC_AGENT_SHUTDOWN_CODE = 211;
437
438 private static final Log _log = LogFactoryUtil.getLog(
439 NettyFabricClient.class);
440
441 private static final ProcessCallable<Serializable>
442 _runtimeExitProcessCallable = new ProcessCallable<Serializable>() {
443
444 @Override
445 public Serializable call() {
446 Runtime runtime = Runtime.getRuntime();
447
448 runtime.exit(_FABRIC_AGENT_SHUTDOWN_CODE);
449
450 return null;
451 }
452
453 private static final long serialVersionUID = 1L;
454
455 };
456
457 private static final ProcessCallable<Serializable>
458 _runtimeHaltProcessCallable = new ProcessCallable<Serializable>() {
459
460 @Override
461 public Serializable call() {
462 Runtime runtime = Runtime.getRuntime();
463
464 runtime.halt(_FABRIC_AGENT_SHUTDOWN_CODE);
465
466 return null;
467 }
468
469 private static final long serialVersionUID = 1L;
470
471 };
472
473 private volatile Bootstrap _bootstrap;
474 private volatile Channel _channel;
475 private final NettyFabricClientConfig _nettyFabricClientConfig;
476 private final NettyFabricClientShutdownCallback
477 _nettyFabricClientShutdownCallback;
478 private final ProcessExecutor _processExecutor;
479 private final AtomicInteger _reconnectCounter = new AtomicInteger();
480
481 private final Thread _shutdownThread = new Thread() {
482
483 @Override
484 public void run() {
485 Channel channel = _channel;
486
487 if (channel != null) {
488 _reconnectCounter.set(0);
489
490 ChannelFuture channelFuture = channel.close();
491
492 channelFuture.syncUninterruptibly();
493 }
494 }
495
496 };
497
498 }