diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 9f57c9f063..8012b882f5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,6 +16,34 @@ */ package org.apache.nifi.processors.standard; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -40,29 +68,6 @@ import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.stream.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + @@ -104,7 +109,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { .defaultValue("1 MB") .required(true) .build(); - + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max number of TCP connections") + .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -132,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(CHARSET); + descriptors.add(MAX_CONNECTIONS); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); @@ -168,14 +180,21 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); + final int maxConnections; + + if (protocol.equals(UDP_VALUE.getValue())) { + maxConnections = 1; + } else{ + maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); + } parser = new SyslogParser(Charset.forName(charSet)); - bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); + bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); syslogEvents = new LinkedBlockingQueue<>(10); errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); // create either a UDP or TCP reader and call open() to bind to the given port - channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents); + channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections); channelReader.open(port, maxChannelBufferSize); final Thread readerThread = new Thread(channelReader); @@ -185,12 +204,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // visible for testing to be overridden and provide a mock ChannelReader if desired - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents) + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents, int maxConnections) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); } else { - return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections); } } @@ -287,6 +306,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final ProcessorLog logger; private DatagramChannel datagramChannel; private volatile boolean stopped = false; + private Selector selector; public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents, final ProcessorLog logger) { @@ -308,37 +328,48 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } datagramChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + datagramChannel.register(selector, SelectionKey.OP_READ); } @Override public void run() { + final ByteBuffer buffer = bufferPool.poll(); while (!stopped) { - final ByteBuffer buffer = bufferPool.poll(); try { - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } - - final SocketAddress sender = datagramChannel.receive(buffer); - if (sender == null) { - Thread.sleep(1000L); // nothing to do so wait... - } else { - final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender? - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available + int selected = selector.select(); + if (selected > 0){ + Iterator selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()) { + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()) { + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress sender; + buffer.clear(); + while (!stopped && (sender = channel.receive(buffer)) != null) { + final SyslogEvent event; + if (sender instanceof InetSocketAddress) { + event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString()); + } else { + event = syslogParser.parseEvent(buffer); + } + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + } + } } } catch (InterruptedException e) { - stop(); + stopped = true; } catch (IOException e) { logger.error("Error reading from DatagramChannel", e); - } finally { - if (buffer != null) { - bufferPool.returnBuffer(buffer, 0); - } } } + if (buffer != null) { + bufferPool.returnBuffer(buffer, 0); + } } @Override @@ -348,11 +379,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void stop() { + selector.wakeup(); stopped = true; } @Override public void close() { + IOUtils.closeQuietly(selector); IOUtils.closeQuietly(datagramChannel); } } @@ -367,21 +400,27 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final SyslogParser syslogParser; private final BlockingQueue syslogEvents; private final ProcessorLog logger; - private ServerSocketChannel serverSocketChannel; - private ExecutorService executor = Executors.newFixedThreadPool(2); + private final ExecutorService executor; private volatile boolean stopped = false; + private Selector selector; + private final BlockingQueue keyQueue; + private final int maxConnections; + private final AtomicInteger currentConnections = new AtomicInteger(0); public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents, - final ProcessorLog logger) { + final ProcessorLog logger, final int maxConnections) { this.bufferPool = bufferPool; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; + this.maxConnections = maxConnections; + this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.executor = Executors.newFixedThreadPool(maxConnections); } @Override public void open(final int port, int maxBufferSize) throws IOException { - serverSocketChannel = ServerSocketChannel.open(); + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); if (maxBufferSize > 0) { serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); @@ -391,42 +430,85 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } serverSocketChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while (!stopped) { try { - final SocketChannel socketChannel = serverSocketChannel.accept(); - if (socketChannel == null) { - Thread.sleep(1000L); // wait for an incoming connection... - } else { - final SocketChannelHandler handler = new SocketChannelHandler( - bufferPool, socketChannel, syslogParser, syslogEvents, logger); - logger.debug("Accepted incoming connection"); - executor.submit(handler); + int selected = selector.select(); + if (selected > 0){ + Iterator selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + if (key.isAcceptable()) { + // Handle new connections coming in + final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); + final SocketChannel socketChannel = channel.accept(); + // Check for available connections + if (currentConnections.incrementAndGet() > maxConnections){ + currentConnections.decrementAndGet(); + logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() }); + IOUtils.closeQuietly(socketChannel); + continue; + } + logger.debug("Accepted incoming connection from {}", + new Object[]{socketChannel.getRemoteAddress().toString()} ); + // Set socket to non-blocking, and register with selector + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + // Prepare the byte buffer for the reads, clear it out and attach to key + ByteBuffer buffer = bufferPool.poll(); + buffer.clear(); + buffer.mark(); + readKey.attach(buffer); + } else if (key.isReadable()) { + // Clear out the operations the select is interested in until done reading + key.interestOps(0); + // Create and execute the read handler + final SocketChannelHandler handler = new SocketChannelHandler(key, this, + syslogParser, syslogEvents, logger); + // and launch the thread + executor.execute(handler); + } + } + } + // Add back all idle sockets to the select + SelectionKey key; + while((key = keyQueue.poll()) != null){ + key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { logger.error("Error accepting connection from SocketChannel", e); - } catch (InterruptedException e) { - stop(); } } } @Override public int getPort() { - return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort(); + // Return the port for the key listening for accepts + for(SelectionKey key : selector.keys()){ + if (key.isValid() && key.isAcceptable()) { + return ((SocketChannel)key.channel()).socket().getLocalPort(); + } + } + return 0; } @Override public void stop() { stopped = true; + selector.wakeup(); } @Override public void close() { - IOUtils.closeQuietly(serverSocketChannel); executor.shutdown(); try { // Wait a while for existing tasks to terminate @@ -439,6 +521,21 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Preserve interrupt status Thread.currentThread().interrupt(); } + for(SelectionKey key : selector.keys()){ + IOUtils.closeQuietly(key.channel()); + } + IOUtils.closeQuietly(selector); + } + + public void completeConnection(SelectionKey key) { + // connection is done. Return the buffer to the pool + bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + currentConnections.decrementAndGet(); + } + + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); } } @@ -449,17 +546,17 @@ public class ListenSyslog extends AbstractSyslogProcessor { */ public static class SocketChannelHandler implements Runnable { - private final BufferPool bufferPool; - private final SocketChannel socketChannel; + private final SelectionKey key; + private final SocketChannelReader dispatcher; private final SyslogParser syslogParser; private final BlockingQueue syslogEvents; private final ProcessorLog logger; private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser, + public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser, final BlockingQueue syslogEvents, final ProcessorLog logger) { - this.bufferPool = bufferPool; - this.socketChannel = socketChannel; + this.key = key; + this.dispatcher = dispatcher; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; @@ -467,55 +564,72 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void run() { + boolean eof = false; + SocketChannel socketChannel = null; + ByteBuffer socketBuffer = null; + try { - int bytesRead = 0; - while (bytesRead >= 0 && !Thread.interrupted()) { + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + socketBuffer = (ByteBuffer) key.attachment(); + // read until the buffer is full + while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { + // prepare byte buffer for reading + socketBuffer.flip(); + // mark the current position as start, in case of partial message read + socketBuffer.mark(); - final ByteBuffer buffer = bufferPool.poll(); - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } + // get total bytes in buffer + int total = socketBuffer.remaining(); + // go through the buffer looking for the end of each message + currBytes.reset(); + for (int i = 0; i < total; i++) { + // NOTE: For higher throughput, the looking for \n and copying into the byte + // stream could be improved + // Pull data out of buffer and cram into byte array + byte currByte = socketBuffer.get(); + currBytes.write(currByte); - try { - // read until the buffer is full - bytesRead = socketChannel.read(buffer); - while (bytesRead > 0) { - bytesRead = socketChannel.read(buffer); + // check if at end of a message + if (currByte == '\n') { + // parse an event, reset the buffer + final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), + socketChannel.socket().getInetAddress().toString()); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + currBytes.reset(); + // Mark this as the start of the next message + socketBuffer.mark(); } - buffer.flip(); - - // go through the buffer looking for the end of each message - int bufferLength = buffer.limit(); - for (int i = 0; i < bufferLength; i++) { - byte currByte = buffer.get(i); - currBytes.write(currByte); - - // at the end of a message so parse an event, reset the buffer, and break out of the loop - if (currByte == '\n') { - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), - socketChannel.socket().getInetAddress().toString()); - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available - currBytes.reset(); - } - } - } finally { - bufferPool.returnBuffer(buffer, 0); } + // Preserve bytes in buffer for next call to run + // NOTE: This code could benefit from the two ByteBuffer read calls to avoid + // this compact for higher throughput + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("done handling SocketChannel"); + } + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; } - - logger.debug("done handling SocketChannel"); } catch (ClosedByInterruptException | InterruptedException e) { - // nothing to do here + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); + // Treat same as closed socket + eof = true; } finally { - IOUtils.closeQuietly(socketChannel); + if(eof == true) { + IOUtils.closeQuietly(socketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } } } - } static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 502b26f3ce..5e558ca71e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor { } } + private void pruneIdleSenders(final long idleThreshold){ + long currentTime = System.currentTimeMillis(); + final List putBack = new ArrayList<>(); + + // if a connection hasn't been used with in the threshold then it gets closed + ChannelSender sender; + while ((sender = senderPool.poll()) != null) { + if (currentTime > (sender.lastUsed + idleThreshold)) { + getLogger().debug("Closing idle connection..."); + sender.close(); + } else { + putBack.add(sender); + } + } + // re-queue senders that weren't idle, but if the queue is full then close the sender + for (ChannelSender putBackSender : putBack) { + boolean returned = senderPool.offer(putBackSender); + if (!returned) { + putBackSender.close(); + } + } + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final String protocol = context.getProperty(PROTOCOL).getValue(); @@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor { final List flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.isEmpty()) { - final List putBack = new ArrayList<>(); - final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); - - // if a connection hasn't been used with in the threshold then it gets closed - ChannelSender sender; - while ((sender = senderPool.poll()) != null) { - if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) { - getLogger().debug("Closing idle connection..."); - sender.close(); - } else { - putBack.add(sender); - } - } - - // re-queue senders that weren't idle, but if the queue is full then close the sender - for (ChannelSender putBackSender : putBack) { - boolean returned = senderPool.offer(putBackSender); - if (!returned) { - putBackSender.close(); - } - } + pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 0e0d972b8c..eb71f88369 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -391,7 +391,8 @@ public class TestListenSyslog { } @Override - protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue syslogEvents) throws IOException { + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, + final BlockingQueue syslogEvents, int maxConnections) { return new ChannelReader() { @Override public void open(int port, int maxBufferSize) throws IOException {