NIFI-9326 Added Socket Keep Alive property to ListenSyslog

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5479.
This commit is contained in:
exceptionfactory 2021-10-25 15:11:00 -05:00 committed by Pierre Villard
parent d7e41e2005
commit 67ccdf6159
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 31 additions and 2 deletions

View File

@ -58,6 +58,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
private Integer socketReceiveBuffer; private Integer socketReceiveBuffer;
private Boolean socketKeepAlive;
private SSLContext sslContext; private SSLContext sslContext;
private ClientAuth clientAuth = ClientAuth.NONE; private ClientAuth clientAuth = ClientAuth.NONE;
@ -81,6 +83,15 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
this.handlerSupplier = Objects.requireNonNull(handlerSupplier); this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
} }
/**
* Set Socket Keep Alive for TCP Sockets
*
* @param socketKeepAlive Keep Alive can be null to use default setting
*/
public void setSocketKeepAlive(final Boolean socketKeepAlive) {
this.socketKeepAlive = socketKeepAlive;
}
/** /**
* Set Socket Receive Buffer Size for TCP Sockets * Set Socket Receive Buffer Size for TCP Sockets
* *
@ -134,17 +145,20 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
@Override @Override
public EventServer getEventServer() { public EventServer getEventServer() {
final AbstractBootstrap<?, ?> bootstrap = getBootstrap(); final AbstractBootstrap<?, ?> bootstrap = getBootstrap();
setBufferSize(bootstrap); setChannelOptions(bootstrap);
final EventLoopGroup group = getEventLoopGroup(); final EventLoopGroup group = getEventLoopGroup();
bootstrap.group(group); bootstrap.group(group);
return getBoundEventServer(bootstrap, group); return getBoundEventServer(bootstrap, group);
} }
private void setBufferSize(AbstractBootstrap<?, ?> bootstrap) { private void setChannelOptions(final AbstractBootstrap<?, ?> bootstrap) {
if (socketReceiveBuffer != null) { if (socketReceiveBuffer != null) {
bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer); bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(socketReceiveBuffer)); bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(socketReceiveBuffer));
} }
if (socketKeepAlive != null) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
}
} }
private AbstractBootstrap<?, ?> getBootstrap() { private AbstractBootstrap<?, ?> getBootstrap() {

View File

@ -183,6 +183,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue(ClientAuth.REQUIRED.name()) .defaultValue(ClientAuth.REQUIRED.name())
.dependsOn(SSL_CONTEXT_SERVICE) .dependsOn(SSL_CONTEXT_SERVICE)
.build(); .build();
public static final PropertyDescriptor SOCKET_KEEP_ALIVE = new PropertyDescriptor.Builder()
.name("socket-keep-alive")
.displayName("Socket Keep Alive")
.description("Whether or not to have TCP socket keep alive turned on. Timing details depend on operating system properties.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.defaultValue(Boolean.FALSE.toString())
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -211,6 +221,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PROTOCOL); descriptors.add(PROTOCOL);
descriptors.add(PORT); descriptors.add(PORT);
descriptors.add(NETWORK_INTF_NAME); descriptors.add(NETWORK_INTF_NAME);
descriptors.add(SOCKET_KEEP_ALIVE);
descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH); descriptors.add(CLIENT_AUTH);
descriptors.add(RECV_BUFFER_SIZE); descriptors.add(RECV_BUFFER_SIZE);
@ -290,6 +301,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
factory.setWorkerThreads(maxConnections); factory.setWorkerThreads(maxConnections);
factory.setSocketReceiveBuffer(maxSocketBufferSize); factory.setSocketReceiveBuffer(maxSocketBufferSize);
final Boolean socketKeepAlive = context.getProperty(SOCKET_KEEP_ALIVE).asBoolean();
factory.setSocketKeepAlive(socketKeepAlive);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) { if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createContext(); final SSLContext sslContext = sslContextService.createContext();

View File

@ -98,6 +98,7 @@ public class TestListenSyslog {
final TransportProtocol protocol = TransportProtocol.TCP; final TransportProtocol protocol = TransportProtocol.TCP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString()); runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port)); runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
runner.setProperty(ListenSyslog.SOCKET_KEEP_ALIVE, Boolean.FALSE.toString());
assertSendSuccess(protocol, port); assertSendSuccess(protocol, port);
} }