mirror of https://github.com/apache/nifi.git
NIFI-9563 Enabled ListenTCP Pool Receive Buffers property
- Removed deprecation from ListenTCP Pool Receive Buffers property - Added BufferAllocator configuration property for NettyEventServerFactory Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #5653.
This commit is contained in:
parent
2ffd4a5a9a
commit
7c50f1429e
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.event.transport.configuration;
|
||||
|
||||
/**
|
||||
* Byte Buffer Allocator configuration options
|
||||
*/
|
||||
public enum BufferAllocator {
|
||||
/** Reusable pool of buffers */
|
||||
POOLED,
|
||||
/** New buffer for each allocation without any pooling */
|
||||
UNPOOLED
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.event.transport.netty;
|
|||
import io.netty.bootstrap.AbstractBootstrap;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelOption;
|
||||
|
@ -29,6 +30,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|||
import org.apache.nifi.event.transport.EventException;
|
||||
import org.apache.nifi.event.transport.EventServer;
|
||||
import org.apache.nifi.event.transport.EventServerFactory;
|
||||
import org.apache.nifi.event.transport.configuration.BufferAllocator;
|
||||
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
|
||||
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
|
||||
import org.apache.nifi.event.transport.configuration.TransportProtocol;
|
||||
|
@ -68,6 +70,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
|
|||
|
||||
private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
|
||||
|
||||
private BufferAllocator bufferAllocator = BufferAllocator.POOLED;
|
||||
|
||||
public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) {
|
||||
this.address = address;
|
||||
this.port = port;
|
||||
|
@ -137,6 +141,15 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
|
|||
this.shutdownTimeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Buffer Allocator option overriding the default POOLED configuration
|
||||
*
|
||||
* @param bufferAllocator Buffer Allocator
|
||||
*/
|
||||
public void setBufferAllocator(final BufferAllocator bufferAllocator) {
|
||||
this.bufferAllocator = Objects.requireNonNull(bufferAllocator, "Buffer Allocator required");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Event Server with Channel bound to configured address and port number
|
||||
*
|
||||
|
@ -159,6 +172,9 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
|
|||
if (socketKeepAlive != null) {
|
||||
bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
|
||||
}
|
||||
if (BufferAllocator.UNPOOLED == bufferAllocator) {
|
||||
bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
|
||||
}
|
||||
}
|
||||
|
||||
private AbstractBootstrap<?, ?> getBootstrap() {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext;
|
|||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.event.transport.EventException;
|
||||
import org.apache.nifi.event.transport.EventServer;
|
||||
import org.apache.nifi.event.transport.configuration.BufferAllocator;
|
||||
import org.apache.nifi.event.transport.configuration.TransportProtocol;
|
||||
import org.apache.nifi.event.transport.message.ByteArrayMessage;
|
||||
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
|
||||
|
@ -106,13 +107,11 @@ public class ListenTCP extends AbstractProcessor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
// Deprecated
|
||||
protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
|
||||
.name("pool-receive-buffers")
|
||||
.displayName("Pool Receive Buffers")
|
||||
.description(
|
||||
"This property is deprecated and no longer used.")
|
||||
.required(false)
|
||||
.description("Enable or disable pooling of buffers that the processor uses for handling bytes received on socket connections. The framework allocates buffers as needed during processing.")
|
||||
.required(true)
|
||||
.defaultValue("True")
|
||||
.allowableValues("True", "False")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
|
@ -146,7 +145,6 @@ public class ListenTCP extends AbstractProcessor {
|
|||
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
|
||||
// Deprecated
|
||||
descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
|
||||
// Deprecated
|
||||
descriptors.add(POOL_RECV_BUFFERS);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(CLIENT_AUTH);
|
||||
|
@ -163,8 +161,8 @@ public class ListenTCP extends AbstractProcessor {
|
|||
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
|
||||
InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
|
||||
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
|
||||
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
|
||||
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
|
||||
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
|
||||
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
|
||||
errorEvents = new LinkedBlockingQueue<>();
|
||||
|
@ -181,6 +179,10 @@ public class ListenTCP extends AbstractProcessor {
|
|||
eventFactory.setClientAuth(clientAuth);
|
||||
}
|
||||
|
||||
final boolean poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean();
|
||||
final BufferAllocator bufferAllocator = poolReceiveBuffers ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
|
||||
eventFactory.setBufferAllocator(bufferAllocator);
|
||||
|
||||
eventFactory.setSocketReceiveBuffer(socketBufferSize);
|
||||
eventFactory.setWorkerThreads(workerThreads);
|
||||
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
|
||||
|
|
|
@ -100,6 +100,7 @@ public class TestListenTCP {
|
|||
@Test
|
||||
public void testRunBatching() throws Exception {
|
||||
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
|
||||
runner.setProperty(ListenTCP.POOL_RECV_BUFFERS, "False");
|
||||
|
||||
final List<String> messages = new ArrayList<>();
|
||||
messages.add("This is message 1\n");
|
||||
|
|
Loading…
Reference in New Issue