NIFI-10232 Set write and idle timeouts in DistributedCacheClient

- Added OnShutdown annotation to Cache Server and Cache Client Service methods

This closes #6221

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-07-18 18:39:53 -04:00 committed by exceptionfactory
parent a661b035e8
commit 40d7d88bc0
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 24 additions and 13 deletions

View File

@ -20,11 +20,16 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.VersionNegotiatorFactory; import org.apache.nifi.remote.VersionNegotiatorFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/** /**
* Bootstrap a new netty connection. This performs the socket handshake used by the nifi distributed set / * Bootstrap a new netty connection. This performs the socket handshake used by the nifi distributed set /
@ -43,15 +48,20 @@ public class CacheClientChannelInitializer extends ChannelInitializer<Channel> {
*/ */
private final VersionNegotiatorFactory versionNegotiatorFactory; private final VersionNegotiatorFactory versionNegotiatorFactory;
private final Duration idleTimeout;
private final Duration writeTimeout;
/** /**
* Constructor. * Constructor.
* *
* @param sslContext the secure context (if any) to be associated with the channel * @param sslContext the secure context (if any) to be associated with the channel
* @param factory creator of object used to broker the version of the distributed cache protocol with the service * @param factory creator of object used to broker the version of the distributed cache protocol with the service
*/ */
public CacheClientChannelInitializer(final SSLContext sslContext, final VersionNegotiatorFactory factory) { public CacheClientChannelInitializer(final SSLContext sslContext, final VersionNegotiatorFactory factory, final Duration idleTimeout, final Duration writeTimeout) {
this.sslContext = sslContext; this.sslContext = sslContext;
this.versionNegotiatorFactory = factory; this.versionNegotiatorFactory = factory;
this.idleTimeout = idleTimeout;
this.writeTimeout = writeTimeout;
} }
@Override @Override
@ -66,7 +76,10 @@ public class CacheClientChannelInitializer extends ChannelInitializer<Channel> {
} }
final VersionNegotiator versionNegotiator = versionNegotiatorFactory.create(); final VersionNegotiator versionNegotiator = versionNegotiatorFactory.create();
channelPipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
channelPipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator)); channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator));
channelPipeline.addLast(new CacheClientRequestHandler()); channelPipeline.addLast(new CacheClientRequestHandler());
channelPipeline.addLast(new CloseContextIdleStateHandler());
} }
} }

View File

@ -31,6 +31,7 @@ import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.time.Duration;
/** /**
* Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service * Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service
@ -71,7 +72,7 @@ public class CacheClientChannelPoolFactory {
final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext(); final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext();
final EventLoopGroup group = new NioEventLoopGroup(); final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap(); final Bootstrap bootstrap = new Bootstrap();
final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, factory); final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, factory, Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
bootstrap.group(group) bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.remoteAddress(hostname, port) .remoteAddress(hostname, port)

View File

@ -65,7 +65,7 @@ public class DistributedCacheClient {
protected void invoke(final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException { protected void invoke(final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException {
final Channel channel = channelPool.acquire().syncUninterruptibly().getNow(); final Channel channel = channelPool.acquire().syncUninterruptibly().getNow();
try { try {
final CacheClientRequestHandler requestHandler = (CacheClientRequestHandler) channel.pipeline().last(); final CacheClientRequestHandler requestHandler = channel.pipeline().get(CacheClientRequestHandler.class);
requestHandler.invoke(channel, outboundAdapter, inboundAdapter); requestHandler.invoke(channel, outboundAdapter, inboundAdapter);
} finally { } finally {
channelPool.release(channel).syncUninterruptibly(); channelPool.release(channel).syncUninterruptibly();

View File

@ -21,7 +21,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@ -116,21 +116,16 @@ public class DistributedMapCacheClientService extends AbstractControllerService
versionNegotiatorFactory); versionNegotiatorFactory);
} }
@OnShutdown
@OnDisabled @OnDisabled
public void onDisabled() throws IOException { public void onDisabled() throws IOException {
getLogger().debug("Disabling Map Cache Client Service"); if (cacheClient != null) {
this.cacheClient.close(); this.cacheClient.close();
}
this.versionNegotiatorFactory = null; this.versionNegotiatorFactory = null;
this.cacheClient = null; this.cacheClient = null;
} }
@OnStopped
public void onStopped() throws IOException {
if (isEnabled()) {
onDisabled();
}
}
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@ -99,6 +100,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
} }
} }
@OnShutdown
@OnDisabled @OnDisabled
public void shutdownServer() throws IOException { public void shutdownServer() throws IOException {
if (cacheServer != null) { if (cacheServer != null) {