NIFI-8039 Adding properties to ListenTCP in order to allow refine behaviour under higher load; Refining thread pool for better scalability

NIFI-8039 Review findings; refining thread pool to be able to scale down properly when not under load
NIFI-8039 Answers to PR comments

This closes #4689.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Bence Simon 2020-11-25 21:53:01 +01:00 committed by Peter Turcsanyi
parent 3625a73d91
commit 39f8a008d4
16 changed files with 366 additions and 183 deletions

View File

@ -16,17 +16,6 @@
*/
package org.apache.nifi.processors.beats;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " +
@ -147,7 +149,7 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -161,7 +163,7 @@ public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}

View File

@ -33,6 +33,8 @@ import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@ -51,7 +53,7 @@ import org.mockito.Mockito;
public class TestBeatsSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private BlockingQueue<ByteBuffer> byteBuffers;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@ -64,8 +66,7 @@ public class TestBeatsSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@ -74,7 +75,7 @@ public class TestBeatsSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processor.util.listen;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -36,7 +34,6 @@ import org.apache.nifi.processor.util.listen.event.Event;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
@ -47,6 +44,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
/**
* An abstract processor to extend from when listening for events over a channel. This processor
* will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, in a background
@ -228,21 +227,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
}
}
/**
* Creates a pool of ByteBuffers with the given size.
*
* @param poolSize the number of buffers to initialize the pool with
* @param bufferSize the size of each buffer
* @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize
*/
protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) {
final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
return bufferPool;
}
/**
* If pollErrorQueue is true, the error queue will be checked first and event will be
* returned from the error queue if available.

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import java.nio.ByteBuffer;
public class ByteBufferFactory implements ByteBufferSource {
private final int bufferSize;
public ByteBufferFactory(final int bufferSize) {
this.bufferSize = bufferSize;
}
@Override
public ByteBuffer acquire() {
return ByteBuffer.allocate(bufferSize);
}
@Override
public void release(final ByteBuffer byteBuffer) {
// nothing to do
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ByteBufferPool implements ByteBufferSource {
private final BlockingQueue<ByteBuffer> pool;
public ByteBufferPool(final int poolSize, final int bufferSize) {
if (poolSize <= 0) {
throw new IllegalArgumentException("A pool of available ByteBuffers is required");
}
this.pool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
pool.offer(ByteBuffer.allocate(bufferSize));
}
}
@Override
public ByteBuffer acquire() {
final ByteBuffer buffer = pool.poll();
buffer.clear();
buffer.mark();
return buffer;
}
@Override
public void release(final ByteBuffer byteBuffer) {
try {
pool.put(byteBuffer);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import java.nio.ByteBuffer;
/**
* Manages byte buffers for the dispatchers.
*/
public interface ByteBufferSource {
/**
* @return Returns a buffer for usage. The buffer can be pooled or created on demand depending on the implementation.
* If the source is not capable to provide an instance, it returns {@code null} instead.
*/
ByteBuffer acquire();
/**
* With calling this method the client releases the buffer. It might be reused by the handler and not to be used
* by this client any more.
*
* @param byteBuffer The byte buffer the client acquired previously.
*/
void release(ByteBuffer byteBuffer);
}

View File

@ -43,7 +43,7 @@ import java.util.concurrent.BlockingQueue;
public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> implements ChannelDispatcher {
private final EventFactory<E> eventFactory;
private final BlockingQueue<ByteBuffer> bufferPool;
private final ByteBufferSource bufferSource;
private final EventQueue<E> events;
private final ComponentLog logger;
private final String sendingHost;
@ -54,28 +54,24 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
private volatile boolean stopped = false;
public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger) {
this(eventFactory, bufferPool, events, logger, null, null);
this(eventFactory, bufferSource, events, logger, null, null);
}
public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final String sendingHost,
final Integer sendingPort) {
this.eventFactory = eventFactory;
this.bufferPool = bufferPool;
this.bufferSource = bufferSource;
this.logger = logger;
this.sendingHost = sendingHost;
this.sendingPort = sendingPort;
this.events = new EventQueue<>(events, logger);
if (bufferPool == null || bufferPool.size() == 0) {
throw new IllegalArgumentException("A pool of available ByteBuffers is required");
}
}
@Override
@ -110,7 +106,7 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
@Override
public void run() {
final ByteBuffer buffer = bufferPool.poll();
final ByteBuffer buffer = bufferSource.acquire();
while (!stopped) {
try {
int selected = selector.select();
@ -155,11 +151,7 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme
}
if (buffer != null) {
try {
bufferPool.put(buffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
bufferSource.release(buffer);
}
}

View File

@ -16,6 +16,17 @@
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -29,20 +40,10 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.ClientAuth;
/**
* Accepts Socket connections on the given port and creates a handler for each connection to
@ -52,15 +53,16 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
private final EventFactory<E> eventFactory;
private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory;
private final BlockingQueue<ByteBuffer> bufferPool;
private final ByteBufferSource bufferSource;
private final BlockingQueue<E> events;
private final ComponentLog logger;
private final int maxConnections;
private final int maxThreadPoolSize;
private final SSLContext sslContext;
private final ClientAuth clientAuth;
private final Charset charset;
private ExecutorService executor;
private ThreadPoolExecutor executor;
private volatile boolean stopped = false;
private Selector selector;
private final BlockingQueue<SelectionKey> keyQueue;
@ -68,45 +70,63 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
final Charset charset) {
this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset);
this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final BlockingQueue<ByteBuffer> bufferPool,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final SSLContext sslContext,
final ClientAuth clientAuth,
final Charset charset) {
this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset);
}
public SocketChannelDispatcher(final EventFactory<E> eventFactory,
final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory,
final ByteBufferSource bufferSource,
final BlockingQueue<E> events,
final ComponentLog logger,
final int maxConnections,
final int maxThreadPoolSize,
final SSLContext sslContext,
final ClientAuth clientAuth,
final Charset charset) {
this.eventFactory = eventFactory;
this.handlerFactory = handlerFactory;
this.bufferPool = bufferPool;
this.bufferSource = bufferSource;
this.events = events;
this.logger = logger;
this.maxConnections = maxConnections;
this.maxThreadPoolSize = maxThreadPoolSize;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.charset = charset;
if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) {
throw new IllegalArgumentException(
"A pool of available ByteBuffers equal to the maximum number of connections is required");
}
}
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port);
stopped = false;
executor = Executors.newFixedThreadPool(maxConnections);
executor = new ThreadPoolExecutor(
maxThreadPoolSize,
maxThreadPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-worker-%d").build());
executor.allowCoreThreadTimeOut(true);
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
@ -120,7 +140,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port));
serverSocketChannel.socket().bind(inetSocketAddress);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
@ -161,9 +181,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
// Prepare the byte buffer for the reads, clear it out
ByteBuffer buffer = bufferPool.poll();
buffer.clear();
buffer.mark();
ByteBuffer buffer = bufferSource.acquire();
// If we have an SSLContext then create an SSLEngine for the channel
SSLSocketChannel sslSocketChannel = null;
@ -265,13 +283,9 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements
@Override
public void completeConnection(SelectionKey key) {
// connection is done. Return the buffer to the pool
SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
try {
bufferPool.put(attachment.getByteBuffer());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// connection is done. Releasing buffer
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
bufferSource.release(attachment.getByteBuffer());
currentConnections.decrementAndGet();
}

View File

@ -17,17 +17,6 @@
package org.apache.nifi.processors.lumberjack;
import com.google.gson.Gson;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@Deprecated
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "lumberjack", "tcp", "logs"})
@ -135,7 +137,7 @@ public class ListenLumberjack extends AbstractListenEventBatchingProcessor<Lumbe
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -145,7 +147,7 @@ public class ListenLumberjack extends AbstractListenEventBatchingProcessor<Lumbe
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
getLogger(), maxConnections, sslContext, charSet);
}

View File

@ -16,21 +16,10 @@
*/
package org.apache.nifi.processors.lumberjack.handler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@ -43,11 +32,25 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("deprecation")
public class ITLumberjackSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private BlockingQueue<ByteBuffer> byteBuffers;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@ -60,8 +63,7 @@ public class ITLumberjackSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@ -70,7 +72,7 @@ public class ITLumberjackSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}

View File

@ -16,17 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@CapabilityDescription("Listens for RELP messages being sent to a given port over TCP. Each message will be " +
@ -135,7 +137,7 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -150,7 +152,7 @@ public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent>
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}

View File

@ -16,28 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -62,6 +40,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
@ -77,6 +57,28 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@ -202,7 +204,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private volatile ChannelDispatcher channelDispatcher;
private volatile SyslogParser parser;
private volatile BlockingQueue<ByteBuffer> bufferPool;
private volatile ByteBufferSource byteBufferSource;
private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
private final BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents.
@ -303,11 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
bufferPool = new LinkedBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
parser = new SyslogParser(Charset.forName(charSet));
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
@ -319,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// create either a UDP or TCP reader and call open() to bind to the given port
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
channelDispatcher = createChannelReader(context, protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher = createChannelReader(context, protocol, byteBufferSource, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher);
@ -334,14 +332,14 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelDispatcher if desired
protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final ByteBufferSource byteBufferSource,
final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService sslContextService, final Charset charset) throws IOException {
final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory();
if (UDP_VALUE.getValue().equals(protocol)) {
return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
return new DatagramChannelDispatcher(eventFactory, byteBufferSource, events, getLogger());
} else {
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -354,7 +352,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charset);
return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, charset);
}
}

View File

@ -16,18 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -40,8 +28,12 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -53,6 +45,18 @@ import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "tcp", "tls", "ssl"})
@ -83,10 +87,37 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor<StandardEven
.defaultValue(ClientAuth.REQUIRED.name())
.build();
public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
.name("max-receiving-threads")
.displayName("Max Number of Receiving Message Handler Threads")
.description(
"The maximum number of threads might be available for handling receiving messages ready all the time. " +
"Cannot be bigger than the \"Max Number of TCP Connections\". " +
"If not set, the value of \"Max Number of TCP Connections\" will be used.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.required(false)
.build();
protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
.name("pool-receive-buffers")
.displayName("Pool Receive Buffers")
.description(
"When turned on, the processor uses pre-populated pool of buffers when receiving messages. " +
"This is prepared during initialisation of the processor. " +
"With high value of Max Number of TCP Connections and Receive Buffer Size this strategy might allocate significant amount of memory! " +
"When turned off, the byte buffers will be created on demand and be destroyed after use.")
.required(true)
.defaultValue("True")
.allowableValues("True", "False")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
MAX_CONNECTIONS,
MAX_RECV_THREAD_POOL_SIZE,
POOL_RECV_BUFFERS,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH
);
@ -105,6 +136,20 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor<StandardEven
.valid(false).subject("Client Auth").build());
}
final int maxConnections = validationContext.getProperty(MAX_CONNECTIONS).asInteger();
if (validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()) {
final int maxPoolSize = validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger();
if (maxPoolSize > maxConnections) {
results.add(new ValidationResult.Builder()
.explanation("\"" + MAX_RECV_THREAD_POOL_SIZE.getDisplayName() + "\" cannot be bigger than \"" + MAX_CONNECTIONS.getDisplayName() + "\"")
.valid(false)
.subject(MAX_RECV_THREAD_POOL_SIZE.getDisplayName())
.build());
}
}
return results;
}
@ -113,11 +158,17 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor<StandardEven
throws IOException {
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int maxThreadPoolSize = context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
: maxConnections;
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
final ByteBufferSource byteBufferSource = context.getProperty(POOL_RECV_BUFFERS).asBoolean()
? new ByteBufferPool(maxConnections, bufferSize)
: new ByteBufferFactory(bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
@ -132,7 +183,8 @@ public class ListenTCP extends AbstractListenEventBatchingProcessor<StandardEven
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
final ChannelHandlerFactory<StandardEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet);
return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections,
maxThreadPoolSize, sslContext, clientAuth, charSet);
}
@Override

View File

@ -33,6 +33,8 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -42,7 +44,6 @@ import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -124,9 +125,9 @@ public class ListenUDP extends AbstractListenEventBatchingProcessor<StandardEven
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort);
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
}
@Override

View File

@ -39,6 +39,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
@ -58,7 +60,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -214,9 +215,9 @@ public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent>
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort);
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
}
@Override

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
@ -48,7 +50,7 @@ public class TestRELPSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private BlockingQueue<ByteBuffer> byteBuffers;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
@ -61,8 +63,7 @@ public class TestRELPSocketChannelHandler {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
byteBuffers = new LinkedBlockingQueue<>();
byteBuffers.add(ByteBuffer.allocate(4096));
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
@ -71,7 +72,7 @@ public class TestRELPSocketChannelHandler {
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}