From 7c50f1429edef6efedb7bfeaff388f22c0eeaa69 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 12 Jan 2022 12:20:41 -0600 Subject: [PATCH] NIFI-9563 Enabled ListenTCP Pool Receive Buffers property - Removed deprecation from ListenTCP Pool Receive Buffers property - Added BufferAllocator configuration property for NettyEventServerFactory Signed-off-by: Pierre Villard This closes #5653. --- .../configuration/BufferAllocator.java | 27 +++++++++++++++++++ .../netty/NettyEventServerFactory.java | 16 +++++++++++ .../nifi/processors/standard/ListenTCP.java | 16 ++++++----- .../processors/standard/TestListenTCP.java | 1 + 4 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java new file mode 100644 index 0000000000..4d059d965e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/BufferAllocator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.event.transport.configuration; + +/** + * Byte Buffer Allocator configuration options + */ +public enum BufferAllocator { + /** Reusable pool of buffers */ + POOLED, + /** New buffer for each allocation without any pooling */ + UNPOOLED +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java index 753844fe3a..bddbccf6b4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java @@ -19,6 +19,7 @@ package org.apache.nifi.event.transport.netty; import io.netty.bootstrap.AbstractBootstrap; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; @@ -29,6 +30,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.nifi.event.transport.EventException; import org.apache.nifi.event.transport.EventServer; import org.apache.nifi.event.transport.EventServerFactory; +import org.apache.nifi.event.transport.configuration.BufferAllocator; import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod; import org.apache.nifi.event.transport.configuration.ShutdownTimeout; import org.apache.nifi.event.transport.configuration.TransportProtocol; @@ -68,6 +70,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration(); + private BufferAllocator bufferAllocator = BufferAllocator.POOLED; + public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) { this.address = address; this.port = port; @@ -137,6 +141,15 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev this.shutdownTimeout = timeout; } + /** + * Set Buffer Allocator option overriding the default POOLED configuration + * + * @param bufferAllocator Buffer Allocator + */ + public void setBufferAllocator(final BufferAllocator bufferAllocator) { + this.bufferAllocator = Objects.requireNonNull(bufferAllocator, "Buffer Allocator required"); + } + /** * Get Event Server with Channel bound to configured address and port number * @@ -159,6 +172,9 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev if (socketKeepAlive != null) { bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive); } + if (BufferAllocator.UNPOOLED == bufferAllocator) { + bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); + } } private AbstractBootstrap getBootstrap() { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index b0ab7f8a1b..fc7695b37a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.event.transport.EventException; import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.configuration.BufferAllocator; import org.apache.nifi.event.transport.configuration.TransportProtocol; import org.apache.nifi.event.transport.message.ByteArrayMessage; import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory; @@ -106,13 +107,11 @@ public class ListenTCP extends AbstractProcessor { .required(false) .build(); - // Deprecated protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder() .name("pool-receive-buffers") .displayName("Pool Receive Buffers") - .description( - "This property is deprecated and no longer used.") - .required(false) + .description("Enable or disable pooling of buffers that the processor uses for handling bytes received on socket connections. The framework allocates buffers as needed during processing.") + .required(true) .defaultValue("True") .allowableValues("True", "False") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) @@ -146,7 +145,6 @@ public class ListenTCP extends AbstractProcessor { descriptors.add(ListenerProperties.MESSAGE_DELIMITER); // Deprecated descriptors.add(MAX_RECV_THREAD_POOL_SIZE); - // Deprecated descriptors.add(POOL_RECV_BUFFERS); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(CLIENT_AUTH); @@ -163,8 +161,8 @@ public class ListenTCP extends AbstractProcessor { int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); - InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface); - Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue()); + final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface); + final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue()); port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger(); events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger()); errorEvents = new LinkedBlockingQueue<>(); @@ -181,6 +179,10 @@ public class ListenTCP extends AbstractProcessor { eventFactory.setClientAuth(clientAuth); } + final boolean poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean(); + final BufferAllocator bufferAllocator = poolReceiveBuffers ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED; + eventFactory.setBufferAllocator(bufferAllocator); + eventFactory.setSocketReceiveBuffer(socketBufferSize); eventFactory.setWorkerThreads(workerThreads); eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier())); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java index ff7cc7c1b2..327dc235a1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java @@ -100,6 +100,7 @@ public class TestListenTCP { @Test public void testRunBatching() throws Exception { runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3"); + runner.setProperty(ListenTCP.POOL_RECV_BUFFERS, "False"); final List messages = new ArrayList<>(); messages.add("This is message 1\n");