From 9a4ce2607dfbf6e9a9731a19536aa7a4f5552ffd Mon Sep 17 00:00:00 2001 From: Jon Shoemaker Date: Tue, 13 Sep 2022 15:48:29 +0000 Subject: [PATCH] NIFI-9878 Added timeout handling for Cache Client handshaking This closes #6414 Co-authored-by: Nissim Shiman Co-authored-by: Jon Shoemaker Signed-off-by: David Handermann --- .../client/CacheClientChannelInitializer.java | 2 +- .../client/CacheClientHandshakeHandler.java | 38 ++++++++++++-- .../client/CacheClientRequestHandler.java | 24 +++++---- .../TestDistributedMapServerAndClient.java | 51 +++++++++++++++++++ 4 files changed, 101 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java index 4f891f448b..13e50bd9f7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java @@ -78,7 +78,7 @@ public class CacheClientChannelInitializer extends ChannelInitializer { final VersionNegotiator versionNegotiator = versionNegotiatorFactory.create(); channelPipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS)); channelPipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS)); - channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator)); + channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator, writeTimeout.toMillis())); channelPipeline.addLast(new CacheClientRequestHandler()); channelPipeline.addLast(new CloseContextIdleStateHandler()); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java index 2dd0be35b7..899ed01523 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -64,24 +65,37 @@ public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter { */ private final VersionNegotiator versionNegotiator; + /** + * THe network timeout associated with handshake completion + */ + private final long timeoutMillis; + /** * Constructor. * * @param channel the channel to which this {@link io.netty.channel.ChannelHandler} is bound. * @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service + * @param timeoutMillis the network timeout associated with handshake completion */ - public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator) { + public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator, + final long timeoutMillis) { this.promiseHandshakeComplete = channel.newPromise(); this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED); this.versionNegotiator = versionNegotiator; + this.timeoutMillis = timeoutMillis; } /** * API providing client application with visibility into the handshake process. Distributed cache requests - * should not be sent using this {@link Channel} until the handshake is complete. + * should not be sent using this {@link Channel} until the handshake is complete. Since the handshake might fail, + * {@link #isSuccess()} should be called after this method completes. */ public void waitHandshakeComplete() { - promiseHandshakeComplete.awaitUninterruptibly(); + promiseHandshakeComplete.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS); + if (!promiseHandshakeComplete.isSuccess()) { + HandshakeException ex = new HandshakeException("Handshake timed out before completion."); + promiseHandshakeComplete.setFailure(ex); + } } /** @@ -157,4 +171,22 @@ public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter { promiseHandshakeComplete.setSuccess(); } } + + /** + * Returns if the handshake completed successfully + * + * @return success/failure of handshake + */ + public boolean isSuccess() { + return promiseHandshakeComplete.isSuccess(); + } + + /** + * Return reason for handshake failure. + * + * @return cause for handshake failure or null on success + */ + public Throwable cause() { + return promiseHandshakeComplete.cause(); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java index 62eccba1d0..50eeb0de4a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java @@ -90,16 +90,20 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter { public void invoke(final Channel channel, final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException { final CacheClientHandshakeHandler handshakeHandler = channel.pipeline().get(CacheClientHandshakeHandler.class); handshakeHandler.waitHandshakeComplete(); - if (handshakeHandler.getVersionNegotiator().getVersion() < outboundAdapter.getMinimumVersion()) { - throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + outboundAdapter.getMinimumVersion()); - } - this.inboundAdapter = inboundAdapter; - channelPromise = channel.newPromise(); - channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes())); - channelPromise.awaitUninterruptibly(); - this.inboundAdapter = new NullInboundAdapter(); - if (channelPromise.cause() != null) { - throw new IOException("Request invocation failed", channelPromise.cause()); + if (handshakeHandler.isSuccess()) { + if (handshakeHandler.getVersionNegotiator().getVersion() < outboundAdapter.getMinimumVersion()) { + throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + outboundAdapter.getMinimumVersion()); + } + this.inboundAdapter = inboundAdapter; + channelPromise = channel.newPromise(); + channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes())); + channelPromise.awaitUninterruptibly(); + this.inboundAdapter = new NullInboundAdapter(); + if (channelPromise.cause() != null) { + throw new IOException("Request invocation failed", channelPromise.cause()); + } + } else { + throw new IOException("Request invocation failed", handshakeHandler.cause()); } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java index d32946e83c..46465b55ea 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java @@ -29,6 +29,14 @@ import org.apache.nifi.distributed.cache.protocol.ProtocolVersion; import org.apache.nifi.distributed.cache.server.CacheServer; import org.apache.nifi.distributed.cache.server.DistributedCacheServer; import org.apache.nifi.distributed.cache.server.EvictionPolicy; +import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod; +import org.apache.nifi.event.transport.configuration.ShutdownTimeout; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.message.ByteArrayMessage; +import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory; +import org.apache.nifi.event.transport.netty.NettyEventServerFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.Processor; import org.apache.nifi.remote.StandardVersionNegotiator; @@ -44,6 +52,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.net.InetAddress; +import java.net.UnknownHostException; + import javax.net.ssl.SSLContext; import java.io.File; import java.io.IOException; @@ -52,6 +63,8 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -298,6 +311,44 @@ public class TestDistributedMapServerAndClient { } } + @Test + public void testIncompleteHandshakeScenario() throws InitializationException, IOException { + // Default port used by Distributed Server and Client + final int port = NetworkUtils.getAvailableTcpPort(); + + // This is used to simulate a DistributedCacheServer that does not complete the handshake response + final BlockingQueue messages = new LinkedBlockingQueue<>(); + final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages); + final EventServer eventServer = serverFactory.getEventServer(); + + DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + + runner.addControllerService("client", client); + runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(port)); + runner.setProperty(client, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms"); + runner.enableControllerService(client); + + final Serializer valueSerializer = new StringSerializer(); + final Serializer keySerializer = new StringSerializer(); + final Deserializer deserializer = new StringDeserializer(); + + try { + assertThrows(IOException.class, () -> client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer)); + } finally { + eventServer.shutdown(); + } + } + + private NettyEventServerFactory getEventServerFactory(final int port, final BlockingQueue messages) throws UnknownHostException { + final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(Mockito.mock(ComponentLog.class), + InetAddress.getByName("127.0.0.1"), port, TransportProtocol.TCP, "\n".getBytes(), 1024, messages); + factory.setWorkerThreads(1); + factory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); + factory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); + return factory; + } + private DistributedMapCacheClientService createClient(final int port) throws InitializationException { final DistributedMapCacheClientService client = new DistributedMapCacheClientService(); final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");