From 39f8a008d4bde2024eb804ff7017542a6e86b572 Mon Sep 17 00:00:00 2001 From: Bence Simon Date: Wed, 25 Nov 2020 21:53:01 +0100 Subject: [PATCH] NIFI-8039 Adding properties to ListenTCP in order to allow refine behaviour under higher load; Refining thread pool for better scalability NIFI-8039 Review findings; refining thread pool to be able to scale down properly when not under load NIFI-8039 Answers to PR comments This closes #4689. Signed-off-by: Peter Turcsanyi --- .../nifi/processors/beats/ListenBeats.java | 28 ++++--- .../TestBeatsSocketChannelHandler.java | 9 +- .../listen/AbstractListenEventProcessor.java | 20 +---- .../listen/dispatcher/ByteBufferFactory.java | 37 +++++++++ .../listen/dispatcher/ByteBufferPool.java | 55 +++++++++++++ .../listen/dispatcher/ByteBufferSource.java | 39 +++++++++ .../dispatcher/DatagramChannelDispatcher.java | 22 ++--- .../dispatcher/SocketChannelDispatcher.java | 82 +++++++++++-------- .../lumberjack/ListenLumberjack.java | 28 ++++--- .../ITLumberjackSocketChannelHandler.java | 36 ++++---- .../nifi/processors/standard/ListenRELP.java | 28 ++++--- .../processors/standard/ListenSyslog.java | 62 +++++++------- .../nifi/processors/standard/ListenTCP.java | 80 ++++++++++++++---- .../nifi/processors/standard/ListenUDP.java | 7 +- .../processors/standard/ListenUDPRecord.java | 7 +- .../handler/TestRELPSocketChannelHandler.java | 9 +- 16 files changed, 366 insertions(+), 183 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java index eab3e76e15..bedca277ed 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.beats; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "beats", "tcp", "logs"}) @CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " + @@ -147,7 +149,7 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor bufferPool = createBufferPool(maxConnections, bufferSize); + final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -161,7 +163,7 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor(eventFactory, handlerFactory, bufferPool, events, + return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, charSet); } diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java index 44f73a293c..7c6c1cb287 100644 --- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java @@ -33,6 +33,8 @@ import javax.xml.bind.DatatypeConverter; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; @@ -51,7 +53,7 @@ import org.mockito.Mockito; public class TestBeatsSocketChannelHandler { private EventFactory eventFactory; private ChannelHandlerFactory channelHandlerFactory; - private BlockingQueue byteBuffers; + private ByteBufferSource byteBufferSource; private BlockingQueue events; private ComponentLog logger = Mockito.mock(ComponentLog.class); private int maxConnections; @@ -64,8 +66,7 @@ public class TestBeatsSocketChannelHandler { eventFactory = new TestEventHolderFactory(); channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>(); - byteBuffers = new LinkedBlockingQueue<>(); - byteBuffers.add(ByteBuffer.allocate(4096)); + byteBufferSource = new ByteBufferPool(1, 4096); events = new LinkedBlockingQueue<>(); logger = Mockito.mock(ComponentLog.class); @@ -74,7 +75,7 @@ public class TestBeatsSocketChannelHandler { sslContext = null; charset = StandardCharsets.UTF_8; - dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger, maxConnections, sslContext, charset); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 20df8687d0..edba61a4d7 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processor.util.listen; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -36,7 +34,6 @@ import org.apache.nifi.processor.util.listen.event.Event; import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -47,6 +44,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + /** * An abstract processor to extend from when listening for events over a channel. This processor * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, in a background @@ -228,21 +227,6 @@ public abstract class AbstractListenEventProcessor extends Abst } } - /** - * Creates a pool of ByteBuffers with the given size. - * - * @param poolSize the number of buffers to initialize the pool with - * @param bufferSize the size of each buffer - * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize - */ - protected BlockingQueue createBufferPool(final int poolSize, final int bufferSize) { - final LinkedBlockingQueue bufferPool = new LinkedBlockingQueue<>(poolSize); - for (int i = 0; i < poolSize; i++) { - bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } - return bufferPool; - } - /** * If pollErrorQueue is true, the error queue will be checked first and event will be * returned from the error queue if available. diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java new file mode 100644 index 0000000000..86a035a348 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.nio.ByteBuffer; + +public class ByteBufferFactory implements ByteBufferSource { + private final int bufferSize; + + public ByteBufferFactory(final int bufferSize) { + this.bufferSize = bufferSize; + } + + @Override + public ByteBuffer acquire() { + return ByteBuffer.allocate(bufferSize); + } + + @Override + public void release(final ByteBuffer byteBuffer) { + // nothing to do + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java new file mode 100644 index 0000000000..42f476f6ce --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ByteBufferPool implements ByteBufferSource { + + private final BlockingQueue pool; + + public ByteBufferPool(final int poolSize, final int bufferSize) { + if (poolSize <= 0) { + throw new IllegalArgumentException("A pool of available ByteBuffers is required"); + } + + this.pool = new LinkedBlockingQueue<>(poolSize); + + for (int i = 0; i < poolSize; i++) { + pool.offer(ByteBuffer.allocate(bufferSize)); + } + } + + @Override + public ByteBuffer acquire() { + final ByteBuffer buffer = pool.poll(); + buffer.clear(); + buffer.mark(); + return buffer; + } + + @Override + public void release(final ByteBuffer byteBuffer) { + try { + pool.put(byteBuffer); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java new file mode 100644 index 0000000000..002827dee9 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.nio.ByteBuffer; + +/** + * Manages byte buffers for the dispatchers. + */ +public interface ByteBufferSource { + + /** + * @return Returns a buffer for usage. The buffer can be pooled or created on demand depending on the implementation. + * If the source is not capable to provide an instance, it returns {@code null} instead. + */ + ByteBuffer acquire(); + + /** + * With calling this method the client releases the buffer. It might be reused by the handler and not to be used + * by this client any more. + * + * @param byteBuffer The byte buffer the client acquired previously. + */ + void release(ByteBuffer byteBuffer); +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java index 69a19982ee..cd721e6fb0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -43,7 +43,7 @@ import java.util.concurrent.BlockingQueue; public class DatagramChannelDispatcher> implements ChannelDispatcher { private final EventFactory eventFactory; - private final BlockingQueue bufferPool; + private final ByteBufferSource bufferSource; private final EventQueue events; private final ComponentLog logger; private final String sendingHost; @@ -54,28 +54,24 @@ public class DatagramChannelDispatcher> impleme private volatile boolean stopped = false; public DatagramChannelDispatcher(final EventFactory eventFactory, - final BlockingQueue bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue events, final ComponentLog logger) { - this(eventFactory, bufferPool, events, logger, null, null); + this(eventFactory, bufferSource, events, logger, null, null); } public DatagramChannelDispatcher(final EventFactory eventFactory, - final BlockingQueue bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue events, final ComponentLog logger, final String sendingHost, final Integer sendingPort) { this.eventFactory = eventFactory; - this.bufferPool = bufferPool; + this.bufferSource = bufferSource; this.logger = logger; this.sendingHost = sendingHost; this.sendingPort = sendingPort; this.events = new EventQueue<>(events, logger); - - if (bufferPool == null || bufferPool.size() == 0) { - throw new IllegalArgumentException("A pool of available ByteBuffers is required"); - } } @Override @@ -110,7 +106,7 @@ public class DatagramChannelDispatcher> impleme @Override public void run() { - final ByteBuffer buffer = bufferPool.poll(); + final ByteBuffer buffer = bufferSource.acquire(); while (!stopped) { try { int selected = selector.select(); @@ -155,11 +151,7 @@ public class DatagramChannelDispatcher> impleme } if (buffer != null) { - try { - bufferPool.put(buffer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + bufferSource.release(buffer); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java index d0be2563f9..41e1a70e20 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -16,6 +16,17 @@ */ package org.apache.nifi.processor.util.listen.dispatcher; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.security.util.ClientAuth; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -29,20 +40,10 @@ import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.security.util.ClientAuth; /** * Accepts Socket connections on the given port and creates a handler for each connection to @@ -52,15 +53,16 @@ public class SocketChannelDispatcher> implements private final EventFactory eventFactory; private final ChannelHandlerFactory handlerFactory; - private final BlockingQueue bufferPool; + private final ByteBufferSource bufferSource; private final BlockingQueue events; private final ComponentLog logger; private final int maxConnections; + private final int maxThreadPoolSize; private final SSLContext sslContext; private final ClientAuth clientAuth; private final Charset charset; - private ExecutorService executor; + private ThreadPoolExecutor executor; private volatile boolean stopped = false; private Selector selector; private final BlockingQueue keyQueue; @@ -68,45 +70,63 @@ public class SocketChannelDispatcher> implements public SocketChannelDispatcher(final EventFactory eventFactory, final ChannelHandlerFactory handlerFactory, - final BlockingQueue bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue events, final ComponentLog logger, final int maxConnections, final SSLContext sslContext, final Charset charset) { - this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); } public SocketChannelDispatcher(final EventFactory eventFactory, final ChannelHandlerFactory handlerFactory, - final BlockingQueue bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue events, final ComponentLog logger, final int maxConnections, final SSLContext sslContext, final ClientAuth clientAuth, final Charset charset) { + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset); + } + + public SocketChannelDispatcher(final EventFactory eventFactory, + final ChannelHandlerFactory handlerFactory, + final ByteBufferSource bufferSource, + final BlockingQueue events, + final ComponentLog logger, + final int maxConnections, + final int maxThreadPoolSize, + final SSLContext sslContext, + final ClientAuth clientAuth, + final Charset charset) { this.eventFactory = eventFactory; this.handlerFactory = handlerFactory; - this.bufferPool = bufferPool; + this.bufferSource = bufferSource; this.events = events; this.logger = logger; this.maxConnections = maxConnections; + this.maxThreadPoolSize = maxThreadPoolSize; this.keyQueue = new LinkedBlockingQueue<>(maxConnections); this.sslContext = sslContext; this.clientAuth = clientAuth; this.charset = charset; - - if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { - throw new IllegalArgumentException( - "A pool of available ByteBuffers equal to the maximum number of connections is required"); - } } @Override public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { + final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port); + stopped = false; - executor = Executors.newFixedThreadPool(maxConnections); + executor = new ThreadPoolExecutor( + maxThreadPoolSize, + maxThreadPoolSize, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-worker-%d").build()); + executor.allowCoreThreadTimeOut(true); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -120,7 +140,7 @@ public class SocketChannelDispatcher> implements } } - serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port)); + serverSocketChannel.socket().bind(inetSocketAddress); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); @@ -161,9 +181,7 @@ public class SocketChannelDispatcher> implements SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); // Prepare the byte buffer for the reads, clear it out - ByteBuffer buffer = bufferPool.poll(); - buffer.clear(); - buffer.mark(); + ByteBuffer buffer = bufferSource.acquire(); // If we have an SSLContext then create an SSLEngine for the channel SSLSocketChannel sslSocketChannel = null; @@ -265,13 +283,9 @@ public class SocketChannelDispatcher> implements @Override public void completeConnection(SelectionKey key) { - // connection is done. Return the buffer to the pool - SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - try { - bufferPool.put(attachment.getByteBuffer()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + // connection is done. Releasing buffer + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + bufferSource.release(attachment.getByteBuffer()); currentConnections.decrementAndGet(); } diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java index 7ff65ee0a1..9ce0a76123 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java @@ -17,17 +17,6 @@ package org.apache.nifi.processors.lumberjack; import com.google.gson.Gson; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import javax.net.ssl.SSLContext; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + @Deprecated @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "lumberjack", "tcp", "logs"}) @@ -135,7 +137,7 @@ public class ListenLumberjack extends AbstractListenEventBatchingProcessor bufferPool = createBufferPool(maxConnections, bufferSize); + final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -145,7 +147,7 @@ public class ListenLumberjack extends AbstractListenEventBatchingProcessor(eventFactory, handlerFactory, bufferPool, events, + return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, charSet); } diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java index 0ef1e416dc..45e74b6fa8 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java @@ -16,21 +16,10 @@ */ package org.apache.nifi.processors.lumberjack.handler; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import javax.net.ssl.SSLContext; -import javax.xml.bind.DatatypeConverter; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; @@ -43,11 +32,25 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import javax.net.ssl.SSLContext; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + @SuppressWarnings("deprecation") public class ITLumberjackSocketChannelHandler { private EventFactory eventFactory; private ChannelHandlerFactory channelHandlerFactory; - private BlockingQueue byteBuffers; + private ByteBufferSource byteBufferSource; private BlockingQueue events; private ComponentLog logger = Mockito.mock(ComponentLog.class); private int maxConnections; @@ -60,8 +63,7 @@ public class ITLumberjackSocketChannelHandler { eventFactory = new TestEventHolderFactory(); channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>(); - byteBuffers = new LinkedBlockingQueue<>(); - byteBuffers.add(ByteBuffer.allocate(4096)); + byteBufferSource = new ByteBufferPool(1, 4096); events = new LinkedBlockingQueue<>(); logger = Mockito.mock(ComponentLog.class); @@ -70,7 +72,7 @@ public class ITLumberjackSocketChannelHandler { sslContext = null; charset = StandardCharsets.UTF_8; - dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger, maxConnections, sslContext, charset); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java index 09a68eb35b..53477a670b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "relp", "tcp", "logs"}) @CapabilityDescription("Listens for RELP messages being sent to a given port over TCP. Each message will be " + @@ -135,7 +137,7 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); // initialize the buffer pool based on max number of connections and the buffer size - final BlockingQueue bufferPool = createBufferPool(maxConnections, bufferSize); + final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -150,7 +152,7 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor } // if we decide to support SSL then get the context and pass it in here - return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events, + return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, charSet); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 77a9a28007..4cc7b333c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,28 +16,6 @@ */ package org.apache.nifi.processors.standard; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -62,6 +40,8 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; @@ -77,6 +57,28 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes; import org.apache.nifi.syslog.events.SyslogEvent; import org.apache.nifi.syslog.parsers.SyslogParser; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @@ -202,7 +204,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { private volatile ChannelDispatcher channelDispatcher; private volatile SyslogParser parser; - private volatile BlockingQueue bufferPool; + private volatile ByteBufferSource byteBufferSource; private volatile BlockingQueue syslogEvents; private final BlockingQueue errorEvents = new LinkedBlockingQueue<>(); private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents. @@ -303,11 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); } - bufferPool = new LinkedBlockingQueue<>(maxConnections); - for (int i = 0; i < maxConnections; i++) { - bufferPool.offer(ByteBuffer.allocate(bufferSize)); - } - + byteBufferSource = new ByteBufferPool(maxConnections, bufferSize); parser = new SyslogParser(Charset.forName(charSet)); syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize); @@ -319,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // create either a UDP or TCP reader and call open() to bind to the given port final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - channelDispatcher = createChannelReader(context, protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); + channelDispatcher = createChannelReader(context, protocol, byteBufferSource, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize); final Thread readerThread = new Thread(channelDispatcher); @@ -334,14 +332,14 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // visible for testing to be overridden and provide a mock ChannelDispatcher if desired - protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final BlockingQueue bufferPool, + protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final ByteBufferSource byteBufferSource, final BlockingQueue events, final int maxConnections, final SSLContextService sslContextService, final Charset charset) throws IOException { final EventFactory eventFactory = new RawSyslogEventFactory(); if (UDP_VALUE.getValue().equals(protocol)) { - return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger()); + return new DatagramChannelDispatcher(eventFactory, byteBufferSource, events, getLogger()); } else { // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -354,7 +352,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } final ChannelHandlerFactory, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); - return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charset); + return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, charset); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index 8359221283..7aac89930f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -40,8 +28,12 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferFactory; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -53,6 +45,18 @@ import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "tcp", "tls", "ssl"}) @@ -83,10 +87,37 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor getAdditionalProperties() { return Arrays.asList( MAX_CONNECTIONS, + MAX_RECV_THREAD_POOL_SIZE, + POOL_RECV_BUFFERS, SSL_CONTEXT_SERVICE, CLIENT_AUTH ); @@ -105,6 +136,20 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor maxConnections) { + results.add(new ValidationResult.Builder() + .explanation("\"" + MAX_RECV_THREAD_POOL_SIZE.getDisplayName() + "\" cannot be bigger than \"" + MAX_CONNECTIONS.getDisplayName() + "\"") + .valid(false) + .subject(MAX_RECV_THREAD_POOL_SIZE.getDisplayName()) + .build()); + } + } + return results; } @@ -113,11 +158,17 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor bufferPool = createBufferPool(maxConnections, bufferSize); + final ByteBufferSource byteBufferSource = context.getProperty(POOL_RECV_BUFFERS).asBoolean() + ? new ByteBufferPool(maxConnections, bufferSize) + : new ByteBufferFactory(bufferSize); // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -132,7 +183,8 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor eventFactory = new StandardEventFactory(); final ChannelHandlerFactory, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); - return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet); + return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, + maxThreadPoolSize, sslContext, clientAuth, charSet); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index c8ffecb8df..76795f0685 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@ -33,6 +33,8 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -42,7 +44,6 @@ import org.apache.nifi.processor.util.listen.event.StandardEventFactory; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -124,9 +125,9 @@ public class ListenUDP extends AbstractListenEventBatchingProcessor bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize); + final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize); final EventFactory eventFactory = new StandardEventFactory(); - return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort); + return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java index 6a723ea5cc..2a20d5d45e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java @@ -39,6 +39,8 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; @@ -58,7 +60,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -214,9 +215,9 @@ public class ListenUDPRecord extends AbstractListenEventProcessor final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue(); final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger(); final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - final BlockingQueue bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize); + final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize); final EventFactory eventFactory = new StandardEventFactory(); - return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort); + return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java index 5bfaca7f81..d7b7d5ca71 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java @@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard.relp.handler; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool; +import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; @@ -48,7 +50,7 @@ public class TestRELPSocketChannelHandler { private EventFactory eventFactory; private ChannelHandlerFactory channelHandlerFactory; - private BlockingQueue byteBuffers; + private ByteBufferSource byteBufferSource; private BlockingQueue events; private ComponentLog logger = Mockito.mock(ComponentLog.class); private int maxConnections; @@ -61,8 +63,7 @@ public class TestRELPSocketChannelHandler { eventFactory = new TestEventHolderFactory(); channelHandlerFactory = new RELPSocketChannelHandlerFactory<>(); - byteBuffers = new LinkedBlockingQueue<>(); - byteBuffers.add(ByteBuffer.allocate(4096)); + byteBufferSource = new ByteBufferPool(1, 4096); events = new LinkedBlockingQueue<>(); logger = Mockito.mock(ComponentLog.class); @@ -71,7 +72,7 @@ public class TestRELPSocketChannelHandler { sslContext = null; charset = StandardCharsets.UTF_8; - dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger, maxConnections, sslContext, charset); }