NIFI-9478 Moved Netty Log Exception Handler to end of pipeline

- Changed display name of Max Number of TCP Connections to Max Number of Worker Threads for ListenTCP
- Set Netty Socket Receive Buffer using Max Socket Buffer Size in ListenTCP

This closes #5599

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2021-12-13 12:31:26 -05:00 committed by exceptionfactory
parent 90930ca197
commit 5e9c86885c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 21 additions and 20 deletions

View File

@ -61,17 +61,17 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac
if (TransportProtocol.UDP.equals(protocol)) {
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new DatagramByteArrayMessageDecoder(),
byteArrayMessageChannelHandler
byteArrayMessageChannelHandler,
logExceptionChannelHandler
));
} else {
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new DelimiterBasedFrameDecoder(maxFrameLength, STRIP_DELIMITER, Unpooled.wrappedBuffer(delimiter)),
new ByteArrayDecoder(),
new SocketByteArrayMessageDecoder(),
byteArrayMessageChannelHandler
byteArrayMessageChannelHandler,
logExceptionChannelHandler
));
}
}

View File

@ -39,8 +39,8 @@ public class ByteArrayNettyEventSenderFactory extends NettyEventSenderFactory<by
public ByteArrayNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol) {
super(address, port, protocol);
final List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new LogExceptionChannelHandler(log));
handlers.add(new ByteArrayEncoder());
handlers.add(new LogExceptionChannelHandler(log));
setHandlerSupplier(() -> handlers);
}
}

View File

@ -43,9 +43,9 @@ public class StreamingNettyEventSenderFactory extends NettyEventSenderFactory<In
final InputStreamMessageEncoder inputStreamMessageEncoder = new InputStreamMessageEncoder();
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new ChunkedWriteHandler(),
inputStreamMessageEncoder
inputStreamMessageEncoder,
logExceptionChannelHandler
));
}
}

View File

@ -46,12 +46,12 @@ public class StringNettyEventSenderFactory extends NettyEventSenderFactory<Strin
public StringNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol, final Charset charset, final LineEnding lineEnding) {
super(address, port, protocol);
final List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new LogExceptionChannelHandler(log));
handlers.add(new StringEncoder(charset));
if (LineEnding.UNIX.equals(lineEnding)) {
handlers.add(new LineEncoder(LineSeparator.UNIX, charset));
}
handlers.add(new LogExceptionChannelHandler(log));
setHandlerSupplier(() -> handlers);
}
}

View File

@ -126,9 +126,10 @@ public class ListenerProperties {
.defaultValue("10000")
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
public static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections")
.description("The maximum number of concurrent TCP connections to accept.")
.displayName("Max Number of Worker Threads")
.description("The maximum number of worker threads available for servicing TCP connections.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)

View File

@ -116,7 +116,7 @@ public class ListenRELP extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
@ -130,7 +130,7 @@ public class ListenRELP extends AbstractProcessor {
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events);
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setWorkerThreads(maxConnections);
eventFactory.setWorkerThreads(workerThreads);
configureFactoryForSsl(context, eventFactory);
try {
@ -157,7 +157,7 @@ public class ListenRELP extends AbstractProcessor {
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
descriptors.add(ListenerProperties.MAX_CONNECTIONS);
descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.add(SSL_CONTEXT_SERVICE);

View File

@ -139,10 +139,9 @@ public class ListenTCP extends AbstractProcessor {
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
// Deprecated
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
descriptors.add(ListenerProperties.MAX_CONNECTIONS);
descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
// Deprecated
@ -160,8 +159,9 @@ public class ListenTCP extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@ -181,8 +181,8 @@ public class ListenTCP extends AbstractProcessor {
eventFactory.setClientAuth(clientAuth);
}
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setWorkerThreads(maxConnections);
eventFactory.setSocketReceiveBuffer(socketBufferSize);
eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
try {

View File

@ -52,9 +52,9 @@ public class RELPMessageServerFactory extends NettyEventServerFactory {
final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset);
setHandlerSupplier(() -> Arrays.asList(
logExceptionChannelHandler,
new RELPFrameDecoder(log, charset),
relpChannelHandler
relpChannelHandler,
logExceptionChannelHandler
));
}
}