001
014
015 package com.liferay.portal.fabric.netty.client;
016
017 import com.liferay.portal.fabric.client.FabricClient;
018 import com.liferay.portal.fabric.local.agent.LocalFabricAgent;
019 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
020 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
021 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
022 import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
023 import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
024 import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
025 import com.liferay.portal.fabric.netty.handlers.NettyFabricWorkerExecutionChannelHandler;
026 import com.liferay.portal.fabric.netty.repository.NettyRepository;
027 import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
028 import com.liferay.portal.fabric.repository.Repository;
029 import com.liferay.portal.fabric.worker.FabricWorker;
030 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.process.ProcessCallable;
034 import com.liferay.portal.kernel.process.ProcessExecutor;
035 import com.liferay.portal.kernel.process.TerminationProcessException;
036 import com.liferay.portal.kernel.util.NamedThreadFactory;
037
038 import io.netty.bootstrap.Bootstrap;
039 import io.netty.channel.Channel;
040 import io.netty.channel.ChannelFuture;
041 import io.netty.channel.ChannelFutureListener;
042 import io.netty.channel.ChannelInitializer;
043 import io.netty.channel.ChannelPipeline;
044 import io.netty.channel.EventLoop;
045 import io.netty.channel.EventLoopGroup;
046 import io.netty.channel.nio.NioEventLoopGroup;
047 import io.netty.channel.socket.SocketChannel;
048 import io.netty.channel.socket.nio.NioSocketChannel;
049 import io.netty.util.Attribute;
050 import io.netty.util.AttributeKey;
051 import io.netty.util.concurrent.DefaultEventExecutorGroup;
052 import io.netty.util.concurrent.EventExecutorGroup;
053
054 import java.io.IOException;
055 import java.io.Serializable;
056
057 import java.nio.file.Files;
058 import java.nio.file.Path;
059
060 import java.util.Map;
061 import java.util.concurrent.Callable;
062 import java.util.concurrent.ExecutionException;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.TimeoutException;
065 import java.util.concurrent.atomic.AtomicInteger;
066
067
070 public class NettyFabricClient implements FabricClient {
071
072 public NettyFabricClient(
073 ProcessExecutor processExecutor,
074 NettyFabricClientConfig nettyFabricClientConfig,
075 NettyFabricClientShutdownCallback nettyFabricClientShutdownCallback) {
076
077 _processExecutor = processExecutor;
078 _nettyFabricClientConfig = nettyFabricClientConfig;
079 _nettyFabricClientShutdownCallback = nettyFabricClientShutdownCallback;
080 }
081
082 @Override
083 public synchronized void connect() {
084 if (_channel != null) {
085 throw new IllegalStateException(
086 "Netty fabric client was already started");
087 }
088
089 if (_log.isInfoEnabled()) {
090 _log.info(
091 "Starting Netty fabric client using " +
092 _nettyFabricClientConfig);
093 }
094
095 Runtime runtime = Runtime.getRuntime();
096
097 runtime.addShutdownHook(_shutdownThread);
098
099 _bootstrap = new Bootstrap();
100
101 _bootstrap.attr(
102 _executionEventExecutorGroupAttributeKey,
103 new DefaultEventExecutorGroup(
104 _nettyFabricClientConfig.getExecutionGroupThreadCount(),
105 new NamedThreadFactory(
106 "Netty Fabric Client/Execution Event Executor Group",
107 Thread.NORM_PRIORITY, null)));
108 _bootstrap.attr(
109 _fileServerEventExecutorGroupAttributeKey,
110 new DefaultEventExecutorGroup(
111 _nettyFabricClientConfig.getFileServerGroupThreadCount(),
112 new NamedThreadFactory(
113 "Netty Fabric Client/File Server Event Executor Group",
114 Thread.NORM_PRIORITY, null)));
115 _bootstrap.attr(
116 _rpcEventExecutorGroupAttributeKey,
117 new DefaultEventExecutorGroup(
118 _nettyFabricClientConfig.getRPCGroupThreadCount(),
119 new NamedThreadFactory(
120 "Netty Fabric Client/RPC Event Executor Group",
121 Thread.NORM_PRIORITY, null)));
122 _bootstrap.channel(NioSocketChannel.class);
123 _bootstrap.group(
124 new NioEventLoopGroup(
125 _nettyFabricClientConfig.getEventLoopGroupThreadCount(),
126 new NamedThreadFactory(
127 "Netty Fabric Client/NIO Event Loop Group",
128 Thread.NORM_PRIORITY, null)));
129 _bootstrap.handler(new NettyFabricClientChannelInitializer());
130
131 int reconnectCount = _nettyFabricClientConfig.getReconnectCount();
132
133 if (reconnectCount < 0) {
134 reconnectCount = Integer.MAX_VALUE;
135 }
136
137 _reconnectCounter.set(reconnectCount);
138
139 doConnect();
140 }
141
142 @Override
143 public synchronized void disconnect() throws InterruptedException {
144 _reconnectCounter.set(0);
145
146 doDisconnect();
147 }
148
149 protected void disposeRepository(Channel channel) {
150 Attribute<Repository<Channel>> attribute = channel.attr(
151 _repositoryAttributeKey);
152
153 Repository<Channel> repository = attribute.getAndRemove();
154
155 if (repository != null) {
156 repository.dispose(true);
157 }
158 }
159
160 protected void doConnect() {
161 ChannelFuture channelFuture = _bootstrap.connect(
162 _nettyFabricClientConfig.getNettyFabricServerHost(),
163 _nettyFabricClientConfig.getNettyFabricServerPort());
164
165 _channel = channelFuture.channel();
166
167 channelFuture.addListener(new PostConnectChannelFutureListener());
168 }
169
170 protected void doDisconnect() throws InterruptedException {
171 if (_channel == null) {
172 throw new IllegalStateException(
173 "Netty fabric client is not started");
174 }
175
176 try {
177 ChannelFuture channelFuture = _channel.close();
178
179 channelFuture.sync();
180 }
181 finally {
182 terminateFabricWorkers(_channel);
183
184 disposeRepository(_channel);
185
186 EventLoop eventLoop = _channel.eventLoop();
187
188 EventLoopGroup eventLoopGroup = eventLoop.parent();
189
190 if (_reconnectCounter.getAndDecrement() > 0) {
191 eventLoopGroup.schedule(
192 new Callable<Void>() {
193
194 @Override
195 public Void call() {
196 doConnect();
197
198 return null;
199 }
200
201 },
202 _nettyFabricClientConfig.getReconnectInterval(),
203 TimeUnit.MILLISECONDS);
204
205 if (_log.isInfoEnabled()) {
206 _log.info(
207 "Try to reconnect " +
208 _nettyFabricClientConfig.getReconnectInterval() +
209 "ms later");
210 }
211 }
212 else {
213 if (_log.isInfoEnabled()) {
214 _log.info(
215 "Shutting down Netty fabric client on " + _channel);
216 }
217
218 try {
219 eventLoopGroup.shutdownGracefully();
220
221 shutdownEventExecutorGroup(
222 _channel, _executionEventExecutorGroupAttributeKey);
223 shutdownEventExecutorGroup(
224 _channel, _fileServerEventExecutorGroupAttributeKey);
225 shutdownEventExecutorGroup(
226 _channel, _rpcEventExecutorGroupAttributeKey);
227
228 _channel = null;
229 _bootstrap = null;
230 }
231 finally {
232 _nettyFabricClientShutdownCallback.shutdown();
233
234 Runtime runtime = Runtime.getRuntime();
235
236 runtime.removeShutdownHook(_shutdownThread);
237 }
238 }
239 }
240 }
241
242 protected EventExecutorGroup getEventExecutorGroup(
243 Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
244
245 Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
246
247 return attribute.get();
248 }
249
250 protected Repository<Channel> getRepository(Channel channel)
251 throws IOException {
252
253 Attribute<Repository<Channel>> attribute = channel.attr(
254 _repositoryAttributeKey);
255
256 Repository<Channel> repository = attribute.get();
257
258 if (repository == null) {
259 Path repositoryPath = _nettyFabricClientConfig.getRepositoryPath();
260
261 Files.createDirectories(repositoryPath);
262
263 repository = new NettyRepository(
264 repositoryPath,
265 _nettyFabricClientConfig.getRepositoryGetFileTimeout());
266
267 Repository<Channel> previousRepository = attribute.setIfAbsent(
268 repository);
269
270 if (previousRepository != null) {
271 repository.dispose(true);
272
273 repository = previousRepository;
274 }
275
276 ChannelPipeline channelPipeline = channel.pipeline();
277
278 channelPipeline.addLast(
279 new FileResponseChannelHandler(
280 repository.getAsyncBroker(),
281 getEventExecutorGroup(
282 _channel, _fileServerEventExecutorGroupAttributeKey)));
283 }
284
285 return repository;
286 }
287
288 protected void registerNettyFabricAgent() throws IOException {
289 ChannelPipeline channelPipeline = _channel.pipeline();
290
291 Repository<Channel> repository = getRepository(_channel);
292
293 channelPipeline.addLast(
294 getEventExecutorGroup(
295 _channel, _executionEventExecutorGroupAttributeKey),
296 new NettyFabricWorkerExecutionChannelHandler(
297 repository, new LocalFabricAgent(_processExecutor),
298 _nettyFabricClientConfig.getExecutionTimeout()));
299
300 Path repositoryPath = repository.getRepositoryPath();
301
302 ChannelFuture channelFuture = _channel.writeAndFlush(
303 new NettyFabricAgentConfig(repositoryPath.toFile()));
304
305 channelFuture.addListener(new PostRegisterChannelFutureListener());
306 }
307
308 protected void shutdownEventExecutorGroup(
309 Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
310
311 Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
312
313 EventExecutorGroup eventExecutorGroup = attribute.getAndRemove();
314
315 if (eventExecutorGroup != null) {
316 eventExecutorGroup.shutdownGracefully();
317 }
318 }
319
320 protected void terminateFabricWorkers(Channel channel) {
321 Map<Long, FabricWorker<?>> fabricWorkers =
322 NettyChannelAttributes.getFabricWorkers(channel);
323
324 if (fabricWorkers == null) {
325 return;
326 }
327
328 for (Map.Entry<Long, FabricWorker<?>> entry :
329 fabricWorkers.entrySet()) {
330
331 FabricWorker<?> fabricWorker = entry.getValue();
332
333 fabricWorker.write(_runtimeExitProcessCallable);
334
335 NoticeableFuture<?> noticeableFuture =
336 fabricWorker.getProcessNoticeableFuture();
337
338 try {
339 try {
340 noticeableFuture.get(
341 _nettyFabricClientConfig.getExecutionTimeout(),
342 TimeUnit.MILLISECONDS);
343 }
344 catch (TimeoutException te) {
345 fabricWorker.write(_runtimeHaltProcessCallable);
346
347 noticeableFuture.get(
348 _nettyFabricClientConfig.getExecutionTimeout(),
349 TimeUnit.MILLISECONDS);
350 }
351 }
352 catch (Throwable t) {
353 if (t instanceof ExecutionException) {
354 Throwable cause = t.getCause();
355
356 if (cause instanceof TerminationProcessException) {
357 TerminationProcessException tpe =
358 (TerminationProcessException)cause;
359
360 if (_log.isWarnEnabled()) {
361 _log.warn(
362 "Forcibly terminate fabric worker " +
363 entry.getKey() + " with exit code " +
364 tpe.getExitCode());
365 }
366
367 continue;
368 }
369 }
370
371 _log.error(
372 "Unable to terminate fabric worker " + entry.getKey(), t);
373 }
374 }
375 }
376
377 protected class NettyFabricClientChannelInitializer
378 extends ChannelInitializer<SocketChannel> {
379
380 @Override
381 protected void initChannel(SocketChannel socketChannel) {
382 ChannelPipeline channelPipeline = socketChannel.pipeline();
383
384 channelPipeline.addLast(
385 AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
386 channelPipeline.addLast(
387 AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
388 channelPipeline.addLast(
389 getEventExecutorGroup(
390 socketChannel, _fileServerEventExecutorGroupAttributeKey),
391 FileRequestChannelHandler.NAME,
392 new FileRequestChannelHandler(
393 _nettyFabricClientConfig.
394 getFileServerFolderCompressionLevel()));
395 channelPipeline.addLast(
396 getEventExecutorGroup(
397 socketChannel, _rpcEventExecutorGroupAttributeKey),
398 NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
399 }
400
401 }
402
403 protected class PostConnectChannelFutureListener
404 implements ChannelFutureListener {
405
406 @Override
407 public void operationComplete(ChannelFuture channelFuture)
408 throws Exception {
409
410 Channel channel = channelFuture.channel();
411
412 if (channelFuture.isSuccess()) {
413 if (_log.isInfoEnabled()) {
414 _log.info("Connected to " + channel.remoteAddress());
415 }
416
417 registerNettyFabricAgent();
418
419 return;
420 }
421
422 String serverAddress =
423 _nettyFabricClientConfig.getNettyFabricServerHost() + ":" +
424 _nettyFabricClientConfig.getNettyFabricServerPort();
425
426 if (channelFuture.isCancelled()) {
427 _log.error("Cancelled connecting to " + serverAddress);
428 }
429 else {
430 _log.error(
431 "Unable to connect to " + serverAddress,
432 channelFuture.cause());
433 }
434
435 doDisconnect();
436 }
437
438 }
439
440 protected class PostDisconnectChannelFutureListener
441 implements ChannelFutureListener {
442
443 @Override
444 public void operationComplete(ChannelFuture channelFuture)
445 throws InterruptedException {
446
447 Channel channel = channelFuture.channel();
448
449 if (_log.isInfoEnabled()) {
450 _log.info("Disconnected from " + channel.remoteAddress());
451 }
452
453 doDisconnect();
454 }
455
456 }
457
458 protected class PostRegisterChannelFutureListener
459 implements ChannelFutureListener {
460
461 @Override
462 public void operationComplete(ChannelFuture channelFuture)
463 throws InterruptedException {
464
465 if (channelFuture.isSuccess()) {
466 _reconnectCounter.set(
467 _nettyFabricClientConfig.getReconnectCount());
468
469 if (_log.isInfoEnabled()) {
470 _log.info("Registered Netty fabric agent on " + _channel);
471 }
472
473 channelFuture = _channel.closeFuture();
474
475 channelFuture.addListener(
476 new PostDisconnectChannelFutureListener());
477
478 return;
479 }
480
481 _log.error("Unable to register Netty fabric agent on " + _channel);
482
483 doDisconnect();
484 }
485
486 }
487
488 private static final int _FABRIC_AGENT_SHUTDOWN_CODE = 211;
489
490 private static final Log _log = LogFactoryUtil.getLog(
491 NettyFabricClient.class);
492
493 private static final AttributeKey<EventExecutorGroup>
494 _executionEventExecutorGroupAttributeKey = AttributeKey.valueOf(
495 "ExecutionEventExecutorGroup");
496 private static final AttributeKey<EventExecutorGroup>
497 _fileServerEventExecutorGroupAttributeKey = AttributeKey.valueOf(
498 "FileServerEventExecutorGroup");
499 private static final AttributeKey<Repository<Channel>>
500 _repositoryAttributeKey = AttributeKey.valueOf("Repository");
501 private static final AttributeKey<EventExecutorGroup>
502 _rpcEventExecutorGroupAttributeKey = AttributeKey.valueOf(
503 "RPCEventExecutorGroup");
504
505 private static final ProcessCallable<Serializable>
506 _runtimeExitProcessCallable = new ProcessCallable<Serializable>() {
507
508 @Override
509 public Serializable call() {
510 Runtime runtime = Runtime.getRuntime();
511
512 runtime.exit(_FABRIC_AGENT_SHUTDOWN_CODE);
513
514 return null;
515 }
516
517 private static final long serialVersionUID = 1L;
518
519 };
520
521 private static final ProcessCallable<Serializable>
522 _runtimeHaltProcessCallable = new ProcessCallable<Serializable>() {
523
524 @Override
525 public Serializable call() {
526 Runtime runtime = Runtime.getRuntime();
527
528 runtime.halt(_FABRIC_AGENT_SHUTDOWN_CODE);
529
530 return null;
531 }
532
533 private static final long serialVersionUID = 1L;
534
535 };
536
537 private Bootstrap _bootstrap;
538 private volatile Channel _channel;
539 private final NettyFabricClientConfig _nettyFabricClientConfig;
540 private final NettyFabricClientShutdownCallback
541 _nettyFabricClientShutdownCallback;
542 private final ProcessExecutor _processExecutor;
543 private final AtomicInteger _reconnectCounter = new AtomicInteger();
544
545 private final Thread _shutdownThread = new Thread() {
546
547 @Override
548 public void run() {
549 Channel channel = _channel;
550
551 if (channel != null) {
552 _reconnectCounter.set(0);
553
554 ChannelFuture channelFuture = channel.close();
555
556 channelFuture.syncUninterruptibly();
557 }
558 }
559
560 };
561
562 }