NIFI-11210 Added read timeout to DistributedMapCacheClientService

This closes #6994

Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-02-24 17:52:05 -05:00 committed by exceptionfactory
parent b7df6b9716
commit 288031f5f6
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 153 additions and 7 deletions

View File

@ -79,7 +79,8 @@ class NettyEventServer implements EventServer {
final String message = String.format("Close channel interrupted: Remote Address [%s]", channel.remoteAddress());
throw new EventException(message, e);
} finally {
group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)
.awaitUninterruptibly(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
}
}

View File

@ -79,7 +79,7 @@ public class CacheClientChannelInitializer extends ChannelInitializer<Channel> {
channelPipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
channelPipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator, writeTimeout.toMillis()));
channelPipeline.addLast(new CacheClientRequestHandler());
channelPipeline.addLast(new CacheClientRequestHandler(writeTimeout.toMillis()));
channelPipeline.addLast(new CloseContextIdleStateHandler());
}
}

View File

@ -27,6 +27,8 @@ import org.apache.nifi.distributed.cache.client.adapter.NullInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
/**
* The {@link io.netty.channel.ChannelHandler} responsible for sending client requests and receiving server responses
@ -44,6 +46,15 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
*/
private ChannelPromise channelPromise;
/**
* THe network timeout associated with the connection
*/
private final long timeoutMillis;
public CacheClientRequestHandler(final long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final ByteBuf byteBuf = (ByteBuf) msg;
@ -97,7 +108,10 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
this.inboundAdapter = inboundAdapter;
channelPromise = channel.newPromise();
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
channelPromise.awaitUninterruptibly();
final boolean completed = channelPromise.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
if (!completed) {
throw new SocketTimeoutException(String.format("Request invocation timeout [%d ms] to remote address [%s]", timeoutMillis, channel.remoteAddress()));
}
this.inboundAdapter = new NullInboundAdapter();
if (channelPromise.cause() != null) {
throw new IOException("Request invocation failed", channelPromise.cause());

View File

@ -113,7 +113,8 @@ public class DistributedMapCacheClientService extends AbstractControllerService
context.getProperty(PORT).asInteger(),
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class),
versionNegotiatorFactory);
versionNegotiatorFactory,
getLogger());
}
@OnShutdown

View File

@ -27,12 +27,14 @@ import org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.VoidInboundAdapter;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
@ -40,6 +42,7 @@ import java.util.Set;
* communication services.
*/
public class NettyDistributedMapCacheClient extends DistributedCacheClient {
private final ComponentLog log;
/**
* Constructor.
@ -50,14 +53,18 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient {
* @param sslContextService the SSL context (if any) associated with requests to the service; if not specified,
* communications will not be encrypted
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
* @param log Component Log from instantiating Services
*/
public NettyDistributedMapCacheClient(
final String hostname,
final int port,
final int timeoutMillis,
final SSLContextService sslContextService,
final VersionNegotiatorFactory factory) {
final VersionNegotiatorFactory factory,
final ComponentLog log
) {
super(hostname, port, timeoutMillis, sslContextService, factory);
this.log = Objects.requireNonNull(log, "Component Log required");
}
/**
@ -312,7 +319,11 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient {
* @throws IOException if unable to communicate with the remote instance
*/
public void close() throws IOException {
invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new VoidInboundAdapter());
try {
invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new VoidInboundAdapter());
} catch (final Exception e) {
log.warn("Sending close command failed: closing channel", e);
}
closeChannelPool();
}
}

View File

@ -48,7 +48,7 @@ public class BooleanInboundAdapter implements InboundAdapter {
* @return the service method response value
*/
public boolean getResult() {
return result;
return (result != null) && result;
}
@Override

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestDistributedMapCacheClientService {
private static final String LOCALHOST = "127.0.0.1";
private static final int MAX_REQUEST_LENGTH = 64;
private final Serializer<String> serializer = new StringSerializer();
private int port;
private EventServer server;
private TestRunner runner;
@BeforeEach
public void setRunner() throws UnknownHostException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
port = NetworkUtils.getAvailableTcpPort();
final InetAddress serverAddress = InetAddress.getByName(LOCALHOST);
final NettyEventServerFactory serverFactory = new NettyEventServerFactory(serverAddress, port, TransportProtocol.TCP);
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
serverFactory.setShutdownTimeout(ShutdownQuietPeriod.QUICK.getDuration());
final ComponentLog log = runner.getLogger();
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V3.value());
serverFactory.setHandlerSupplier(() -> Arrays.asList(
new CacheVersionResponseEncoder(),
new MapCacheRequestDecoder(log, MAX_REQUEST_LENGTH, MapOperation.values()),
new CacheVersionRequestHandler(log, versionNegotiator)
));
server = serverFactory.getEventServer();
}
@AfterEach
public void shutdownServer() {
server.shutdown();
}
/**
* Service will hold request long enough for client timeout to be triggered, thus causing the request to fail.
*/
@Test
public void testClientTimeoutOnServerNetworkFailure() throws InitializationException {
final String clientId = DistributedMapCacheClientService.class.getSimpleName();
final DistributedMapCacheClientService clientService = new DistributedMapCacheClientService();
runner.addControllerService(clientId, clientService);
runner.setProperty(clientService, DistributedMapCacheClientService.HOSTNAME, LOCALHOST);
runner.setProperty(clientService, DistributedMapCacheClientService.PORT, String.valueOf(port));
runner.setProperty(clientService, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
runner.enableControllerService(clientService);
runner.assertValid();
try {
assertThrows(SocketTimeoutException.class, () -> clientService.put("key", "value", serializer, serializer));
} finally {
runner.disableControllerService(clientService);
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream output) throws IOException {
output.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}