NIFI-12370 Fixed Distributed Map Cache Client Service Shutdown

- Moved EventLoopGroup from CacheClientChannelPoolFactory to DistributedCacheClient to enable closing the EventLoopGroup after closing the ChannelPool

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8027.
This commit is contained in:
exceptionfactory 2023-11-14 20:30:57 -06:00 committed by Pierre Villard
parent 3619780813
commit 5815d83207
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 16 additions and 11 deletions

View File

@ -20,13 +20,11 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService;
@ -39,10 +37,9 @@ import java.time.Duration;
* methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService}
* and {@link DistributedMapCacheClientService}.
*/
public class CacheClientChannelPoolFactory {
class CacheClientChannelPoolFactory {
private static final int MAX_PENDING_ACQUIRES = 1024;
private static final boolean DAEMON_THREAD_ENABLED = true;
private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
@ -64,7 +61,7 @@ public class CacheClientChannelPoolFactory {
* @param sslContextService the SSL context (if any) associated with requests to the service; if not specified,
* communications will not be encrypted
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
* @param poolName channel pool name, used for threads name prefix
* @param eventLoopGroup Netty Event Loop Group providing threads for managing connections
* @return a channel pool object from which {@link Channel} objects may be obtained
*/
public ChannelPool createChannelPool(final String hostname,
@ -72,12 +69,11 @@ public class CacheClientChannelPoolFactory {
final int timeoutMillis,
final SSLContextService sslContextService,
final VersionNegotiatorFactory factory,
final String poolName) {
final EventLoopGroup eventLoopGroup) {
final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext();
final EventLoopGroup group = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
final Bootstrap bootstrap = new Bootstrap();
final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, factory, Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
bootstrap.group(group)
bootstrap.group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.remoteAddress(hostname, port)
.channel(NioSocketChannel.class);

View File

@ -17,7 +17,10 @@
package org.apache.nifi.distributed.cache.client;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import org.apache.nifi.remote.VersionNegotiatorFactory;
@ -31,11 +34,15 @@ import java.io.IOException;
*/
public class DistributedCacheClient {
private static final boolean DAEMON_THREAD_ENABLED = true;
/**
* The pool of network connections used to service client requests.
*/
private final ChannelPool channelPool;
private final EventLoopGroup eventLoopGroup;
/**
* Constructor.
*
@ -53,9 +60,10 @@ public class DistributedCacheClient {
final SSLContextService sslContextService,
final VersionNegotiatorFactory factory,
final String identifier) {
String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier);
final String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier);
this.eventLoopGroup = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
this.channelPool = new CacheClientChannelPoolFactory().createChannelPool(
hostname, port, timeoutMillis, sslContextService, factory, poolName);
hostname, port, timeoutMillis, sslContextService, factory, eventLoopGroup);
}
/**
@ -76,9 +84,10 @@ public class DistributedCacheClient {
}
/**
* Shutdown {@link ChannelPool} cleanly.
* Close Channel Pool and supporting Event Loop Group
*/
protected void closeChannelPool() {
channelPool.close();
eventLoopGroup.close();
}
}