NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread

- Added comments and code review changes
         - fixed fixbugs bug
This commit is contained in:
Tony Kurc 2015-10-30 08:45:06 -04:00
parent 9c542432da
commit 5611dac3f8
3 changed files with 245 additions and 127 deletions

View File

@ -16,6 +16,34 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -40,29 +68,6 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@ -104,7 +109,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max number of TCP connections")
.description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -132,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.add(MAX_CONNECTIONS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@ -168,14 +180,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
final int maxConnections;
if (protocol.equals(UDP_VALUE.getValue())) {
maxConnections = 1;
} else{
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
parser = new SyslogParser(Charset.forName(charSet));
bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@ -185,12 +204,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelReader if desired
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
} else {
return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
}
}
@ -287,6 +306,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final ProcessorLog logger;
private DatagramChannel datagramChannel;
private volatile boolean stopped = false;
private Selector selector;
public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger) {
@ -308,37 +328,48 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
datagramChannel.socket().bind(new InetSocketAddress(port));
selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
}
@Override
public void run() {
final ByteBuffer buffer = bufferPool.poll();
while (!stopped) {
final ByteBuffer buffer = bufferPool.poll();
try {
if (buffer == null) {
Thread.sleep(10L);
logger.debug("no available buffers, continuing...");
continue;
}
final SocketAddress sender = datagramChannel.receive(buffer);
if (sender == null) {
Thread.sleep(1000L); // nothing to do so wait...
} else {
final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
int selected = selector.select();
if (selected > 0){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
while (selectorKeys.hasNext()) {
SelectionKey key = selectorKeys.next();
selectorKeys.remove();
if (!key.isValid()) {
continue;
}
DatagramChannel channel = (DatagramChannel) key.channel();
SocketAddress sender;
buffer.clear();
while (!stopped && (sender = channel.receive(buffer)) != null) {
final SyslogEvent event;
if (sender instanceof InetSocketAddress) {
event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
} else {
event = syslogParser.parseEvent(buffer);
}
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
}
}
}
} catch (InterruptedException e) {
stop();
stopped = true;
} catch (IOException e) {
logger.error("Error reading from DatagramChannel", e);
} finally {
if (buffer != null) {
bufferPool.returnBuffer(buffer, 0);
}
}
}
if (buffer != null) {
bufferPool.returnBuffer(buffer, 0);
}
}
@Override
@ -348,11 +379,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void stop() {
selector.wakeup();
stopped = true;
}
@Override
public void close() {
IOUtils.closeQuietly(selector);
IOUtils.closeQuietly(datagramChannel);
}
}
@ -367,21 +400,27 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private ServerSocketChannel serverSocketChannel;
private ExecutorService executor = Executors.newFixedThreadPool(2);
private final ExecutorService executor;
private volatile boolean stopped = false;
private Selector selector;
private final BlockingQueue<SelectionKey> keyQueue;
private final int maxConnections;
private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger) {
final ProcessorLog logger, final int maxConnections) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
this.maxConnections = maxConnections;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.executor = Executors.newFixedThreadPool(maxConnections);
}
@Override
public void open(final int port, int maxBufferSize) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
if (maxBufferSize > 0) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@ -391,42 +430,85 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(port));
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!stopped) {
try {
final SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
Thread.sleep(1000L); // wait for an incoming connection...
} else {
final SocketChannelHandler handler = new SocketChannelHandler(
bufferPool, socketChannel, syslogParser, syslogEvents, logger);
logger.debug("Accepted incoming connection");
executor.submit(handler);
int selected = selector.select();
if (selected > 0){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
while (selectorKeys.hasNext()){
SelectionKey key = selectorKeys.next();
selectorKeys.remove();
if (!key.isValid()){
continue;
}
if (key.isAcceptable()) {
// Handle new connections coming in
final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = channel.accept();
// Check for available connections
if (currentConnections.incrementAndGet() > maxConnections){
currentConnections.decrementAndGet();
logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
IOUtils.closeQuietly(socketChannel);
continue;
}
logger.debug("Accepted incoming connection from {}",
new Object[]{socketChannel.getRemoteAddress().toString()} );
// Set socket to non-blocking, and register with selector
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
// Prepare the byte buffer for the reads, clear it out and attach to key
ByteBuffer buffer = bufferPool.poll();
buffer.clear();
buffer.mark();
readKey.attach(buffer);
} else if (key.isReadable()) {
// Clear out the operations the select is interested in until done reading
key.interestOps(0);
// Create and execute the read handler
final SocketChannelHandler handler = new SocketChannelHandler(key, this,
syslogParser, syslogEvents, logger);
// and launch the thread
executor.execute(handler);
}
}
}
// Add back all idle sockets to the select
SelectionKey key;
while((key = keyQueue.poll()) != null){
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
logger.error("Error accepting connection from SocketChannel", e);
} catch (InterruptedException e) {
stop();
}
}
}
@Override
public int getPort() {
return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
// Return the port for the key listening for accepts
for(SelectionKey key : selector.keys()){
if (key.isValid() && key.isAcceptable()) {
return ((SocketChannel)key.channel()).socket().getLocalPort();
}
}
return 0;
}
@Override
public void stop() {
stopped = true;
selector.wakeup();
}
@Override
public void close() {
IOUtils.closeQuietly(serverSocketChannel);
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
@ -439,6 +521,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
for(SelectionKey key : selector.keys()){
IOUtils.closeQuietly(key.channel());
}
IOUtils.closeQuietly(selector);
}
public void completeConnection(SelectionKey key) {
// connection is done. Return the buffer to the pool
bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
currentConnections.decrementAndGet();
}
public void addBackForSelection(SelectionKey key) {
keyQueue.offer(key);
selector.wakeup();
}
}
@ -449,17 +546,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
*/
public static class SocketChannelHandler implements Runnable {
private final BufferPool bufferPool;
private final SocketChannel socketChannel;
private final SelectionKey key;
private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
this.bufferPool = bufferPool;
this.socketChannel = socketChannel;
this.key = key;
this.dispatcher = dispatcher;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
@ -467,55 +564,72 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void run() {
boolean eof = false;
SocketChannel socketChannel = null;
ByteBuffer socketBuffer = null;
try {
int bytesRead = 0;
while (bytesRead >= 0 && !Thread.interrupted()) {
int bytesRead;
socketChannel = (SocketChannel) key.channel();
socketBuffer = (ByteBuffer) key.attachment();
// read until the buffer is full
while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
// prepare byte buffer for reading
socketBuffer.flip();
// mark the current position as start, in case of partial message read
socketBuffer.mark();
final ByteBuffer buffer = bufferPool.poll();
if (buffer == null) {
Thread.sleep(10L);
logger.debug("no available buffers, continuing...");
continue;
}
// get total bytes in buffer
int total = socketBuffer.remaining();
// go through the buffer looking for the end of each message
currBytes.reset();
for (int i = 0; i < total; i++) {
// NOTE: For higher throughput, the looking for \n and copying into the byte
// stream could be improved
// Pull data out of buffer and cram into byte array
byte currByte = socketBuffer.get();
currBytes.write(currByte);
try {
// read until the buffer is full
bytesRead = socketChannel.read(buffer);
while (bytesRead > 0) {
bytesRead = socketChannel.read(buffer);
// check if at end of a message
if (currByte == '\n') {
// parse an event, reset the buffer
final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
socketChannel.socket().getInetAddress().toString());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
currBytes.reset();
// Mark this as the start of the next message
socketBuffer.mark();
}
buffer.flip();
// go through the buffer looking for the end of each message
int bufferLength = buffer.limit();
for (int i = 0; i < bufferLength; i++) {
byte currByte = buffer.get(i);
currBytes.write(currByte);
// at the end of a message so parse an event, reset the buffer, and break out of the loop
if (currByte == '\n') {
final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
socketChannel.socket().getInetAddress().toString());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
currBytes.reset();
}
}
} finally {
bufferPool.returnBuffer(buffer, 0);
}
// Preserve bytes in buffer for next call to run
// NOTE: This code could benefit from the two ByteBuffer read calls to avoid
// this compact for higher throughput
socketBuffer.reset();
socketBuffer.compact();
logger.debug("done handling SocketChannel");
}
// Check for closed socket
if( bytesRead < 0 ){
eof = true;
}
logger.debug("done handling SocketChannel");
} catch (ClosedByInterruptException | InterruptedException e) {
// nothing to do here
logger.debug("read loop interrupted, closing connection");
// Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
// Treat same as closed socket
eof = true;
} finally {
IOUtils.closeQuietly(socketChannel);
if(eof == true) {
IOUtils.closeQuietly(socketChannel);
dispatcher.completeConnection(key);
} else {
dispatcher.addBackForSelection(key);
}
}
}
}
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {

View File

@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
}
private void pruneIdleSenders(final long idleThreshold){
long currentTime = System.currentTimeMillis();
final List<ChannelSender> putBack = new ArrayList<>();
// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
if (currentTime > (sender.lastUsed + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
} else {
putBack.add(sender);
}
}
// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final String protocol = context.getProperty(PROTOCOL).getValue();
@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.isEmpty()) {
final List<ChannelSender> putBack = new ArrayList<>();
final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
} else {
putBack.add(sender);
}
}
// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
return;
}

View File

@ -391,7 +391,8 @@ public class TestListenSyslog {
}
@Override
protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
return new ChannelReader() {
@Override
public void open(int port, int maxBufferSize) throws IOException {