NIFI-9645 - Updated PutSplunk to allow idle connection timeouts

This closes #5841

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-03-04 21:46:42 -05:00 committed by exceptionfactory
parent 7ff70706ee
commit 713e2fd03c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 66 additions and 2 deletions

View File

@ -78,10 +78,11 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection.")
.description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
.required(true)
.defaultValue("5 seconds")
.defaultValue("15 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
// Putting these properties here so sub-classes don't have to redefine them, but they are
@ -249,7 +250,9 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
factory.setShutdownQuietPeriod(Duration.ZERO); // Quiet period not necessary since sending threads will have completed before shutting down event sender
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int idleTimeout = context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
factory.setIdleTimeout(Duration.ofSeconds(idleTimeout));
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* Idle State Handler closes channel context when state indicates idle communications
*/
public class CloseContextIdleStateHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
if (event instanceof IdleStateEvent) {
final IdleStateEvent idleStateEvent = (IdleStateEvent) event;
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
context.close();
}
}
}
}

View File

@ -61,6 +61,8 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
private Duration timeout = Duration.ofSeconds(30);
private Duration idleTimeout = Duration.ofSeconds(0);
private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
@ -115,6 +117,13 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
this.timeout = Objects.requireNonNull(timeout, "Timeout required");
}
/**
* Set the idle timeout period for outgoing client connections
*/
public void setIdleTimeout(final Duration idleTimeout) {
this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout required");
}
/**
* Set shutdown quiet period
*
@ -205,6 +214,7 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
? new StandardChannelInitializer<>(handlerSupplier)
: new ClientSslStandardChannelInitializer<>(handlerSupplier, sslContext);
channelInitializer.setWriteTimeout(timeout);
channelInitializer.setIdleTimeout(idleTimeout);
return channelInitializer;
}
}

View File

@ -20,7 +20,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
import java.time.Duration;
import java.util.List;
@ -37,6 +39,8 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
private Duration writeTimeout = Duration.ofSeconds(30);
private Duration idleTimeout = Duration.ofSeconds(0);
/**
* Standard Channel Initializer with handlers
*
@ -55,10 +59,19 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
this.writeTimeout = Objects.requireNonNull(writeTimeout);
}
/**
* Set the idle timeout period for outgoing client connections
*/
public void setIdleTimeout(final Duration idleTimeout) {
this.idleTimeout = Objects.requireNonNull(idleTimeout);
}
@Override
protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
pipeline.addLast(new CloseContextIdleStateHandler());
handlerSupplier.get().forEach(pipeline::addLast);
}
}