diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java index 036161e406..3073926391 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java @@ -61,17 +61,17 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac if (TransportProtocol.UDP.equals(protocol)) { setHandlerSupplier(() -> Arrays.asList( - logExceptionChannelHandler, new DatagramByteArrayMessageDecoder(), - byteArrayMessageChannelHandler + byteArrayMessageChannelHandler, + logExceptionChannelHandler )); } else { setHandlerSupplier(() -> Arrays.asList( - logExceptionChannelHandler, new DelimiterBasedFrameDecoder(maxFrameLength, STRIP_DELIMITER, Unpooled.wrappedBuffer(delimiter)), new ByteArrayDecoder(), new SocketByteArrayMessageDecoder(), - byteArrayMessageChannelHandler + byteArrayMessageChannelHandler, + logExceptionChannelHandler )); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java index 71219b3c28..2e005e51b5 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java @@ -39,8 +39,8 @@ public class ByteArrayNettyEventSenderFactory extends NettyEventSenderFactory handlers = new ArrayList<>(); - handlers.add(new LogExceptionChannelHandler(log)); handlers.add(new ByteArrayEncoder()); + handlers.add(new LogExceptionChannelHandler(log)); setHandlerSupplier(() -> handlers); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java index efb130b7da..483a765a46 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java @@ -43,9 +43,9 @@ public class StreamingNettyEventSenderFactory extends NettyEventSenderFactory Arrays.asList( - logExceptionChannelHandler, new ChunkedWriteHandler(), - inputStreamMessageEncoder + inputStreamMessageEncoder, + logExceptionChannelHandler )); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java index 448ee3c3eb..47999c668b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java @@ -46,12 +46,12 @@ public class StringNettyEventSenderFactory extends NettyEventSenderFactory handlers = new ArrayList<>(); - handlers.add(new LogExceptionChannelHandler(log)); handlers.add(new StringEncoder(charset)); if (LineEnding.UNIX.equals(lineEnding)) { handlers.add(new LineEncoder(LineSeparator.UNIX, charset)); } + handlers.add(new LogExceptionChannelHandler(log)); setHandlerSupplier(() -> handlers); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java index 5810f3091d..9bcdc66147 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java @@ -126,9 +126,10 @@ public class ListenerProperties { .defaultValue("10000") .required(true) .build(); - public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder() .name("Max Number of TCP Connections") - .description("The maximum number of concurrent TCP connections to accept.") + .displayName("Max Number of Worker Threads") + .description("The maximum number of worker threads available for servicing TCP connections.") .addValidator(StandardValidators.createLongValidator(1, 65535, true)) .defaultValue("2") .required(true) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java index 0e3cef958a..7eff677574 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java @@ -116,7 +116,7 @@ public class ListenRELP extends AbstractProcessor { @OnScheduled public void onScheduled(ProcessContext context) throws IOException { - int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger(); + int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger(); int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface); @@ -130,7 +130,7 @@ public class ListenRELP extends AbstractProcessor { messageDemarcatorBytes = msgDemarcator.getBytes(charset); final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events); eventFactory.setSocketReceiveBuffer(bufferSize); - eventFactory.setWorkerThreads(maxConnections); + eventFactory.setWorkerThreads(workerThreads); configureFactoryForSsl(context, eventFactory); try { @@ -157,7 +157,7 @@ public class ListenRELP extends AbstractProcessor { descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE); descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE); descriptors.add(ListenerProperties.CHARSET); - descriptors.add(ListenerProperties.MAX_CONNECTIONS); + descriptors.add(ListenerProperties.WORKER_THREADS); descriptors.add(ListenerProperties.MAX_BATCH_SIZE); descriptors.add(ListenerProperties.MESSAGE_DELIMITER); descriptors.add(SSL_CONTEXT_SERVICE); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index e6f29c900a..b0ab7f8a1b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -139,10 +139,9 @@ public class ListenTCP extends AbstractProcessor { descriptors.add(ListenerProperties.PORT); descriptors.add(ListenerProperties.RECV_BUFFER_SIZE); descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE); - // Deprecated descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE); descriptors.add(ListenerProperties.CHARSET); - descriptors.add(ListenerProperties.MAX_CONNECTIONS); + descriptors.add(ListenerProperties.WORKER_THREADS); descriptors.add(ListenerProperties.MAX_BATCH_SIZE); descriptors.add(ListenerProperties.MESSAGE_DELIMITER); // Deprecated @@ -160,8 +159,9 @@ public class ListenTCP extends AbstractProcessor { @OnScheduled public void onScheduled(ProcessContext context) throws IOException { - int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger(); + int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger(); int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface); Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue()); @@ -181,8 +181,8 @@ public class ListenTCP extends AbstractProcessor { eventFactory.setClientAuth(clientAuth); } - eventFactory.setSocketReceiveBuffer(bufferSize); - eventFactory.setWorkerThreads(maxConnections); + eventFactory.setSocketReceiveBuffer(socketBufferSize); + eventFactory.setWorkerThreads(workerThreads); eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier())); try { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java index 646099d925..83e60b9b72 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java @@ -52,9 +52,9 @@ public class RELPMessageServerFactory extends NettyEventServerFactory { final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset); setHandlerSupplier(() -> Arrays.asList( - logExceptionChannelHandler, new RELPFrameDecoder(log, charset), - relpChannelHandler + relpChannelHandler, + logExceptionChannelHandler )); } }