From 4653908b22aebf0f590d2e47c25506ded24ca63e Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 1 Dec 2015 17:14:52 -0500 Subject: [PATCH] NIFI-1227 Adding SSL/TLS support to Syslog processors -Removing connection creation from PutSyslog onScheduled, and reusing constructor in SSLSocketChannel -Reducing visiblity of inner classes --- .../io/socket/ssl/SSLSocketChannel.java | 8 +- .../processors/standard/ListenSyslog.java | 248 ++++++++++++++---- .../nifi/processors/standard/PutSyslog.java | 100 ++++++- .../processors/standard/TestListenSyslog.java | 51 ++++ .../processors/standard/TestPutSyslog.java | 7 +- 5 files changed, 345 insertions(+), 69 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 1f23d790a2..408eb5968e 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -85,6 +85,10 @@ public class SSLSocketChannel implements Closeable { } public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException { + this(sslContext.createSSLEngine(), socketChannel, client); + } + + public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel, final boolean client) throws IOException { if (!socketChannel.isConnected()) { throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel"); } @@ -96,9 +100,9 @@ public class SSLSocketChannel implements Closeable { this.hostname = socket.getInetAddress().getHostName(); this.port = socket.getPort(); - this.engine = sslContext.createSSLEngine(); + this.engine = sslEngine; this.engine.setUseClientMode(client); - engine.setNeedClientAuth(true); + this.engine.setNeedClientAuth(true); streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index fbe64ea523..e1e0e91407 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,6 +16,38 @@ */ package org.apache.nifi.processors.standard; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -46,34 +78,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; -import org.apache.nifi.stream.io.ByteArrayOutputStream; - @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @@ -129,8 +133,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() .name("Max Batch Size") .description( - "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " - + "the up to this configured maximum number of messages") + "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " + + "the up to this configured maximum number of messages") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(false) .defaultValue("1") @@ -151,6 +155,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { .defaultValue("true") .required(true) .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " + + "messages will be received over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -176,6 +187,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { final List descriptors = new ArrayList<>(); descriptors.add(PROTOCOL); descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(MAX_CONNECTIONS); @@ -222,6 +234,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false) .explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build()); } + + final String protocol = validationContext.getProperty(PROTOCOL).getValue(); + final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) { + results.add(new ValidationResult.Builder() + .explanation("SSL can not be used with UDP") + .valid(false).subject("SSL Context").build()); + } + return results; } @@ -251,7 +273,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { parser = new SyslogParser(Charset.forName(charSet)); // create either a UDP or TCP reader and call open() to bind to the given port - channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService); channelReader.open(port, maxChannelBufferSize); final Thread readerThread = new Thread(channelReader); @@ -267,12 +290,18 @@ public class ListenSyslog extends AbstractSyslogProcessor { // visible for testing to be overridden and provide a mock ChannelReader if desired protected ChannelReader createChannelReader(final String protocol, final BlockingQueue bufferPool, final BlockingQueue syslogEvents, - int maxConnections) + int maxConnections, final SSLContextService sslContextService) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { return new DatagramChannelReader(bufferPool, syslogEvents, getLogger()); } else { - return new SocketChannelReader(bufferPool, syslogEvents, getLogger(), maxConnections); + // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher + SSLContext sslContext = null; + if (sslContextService != null) { + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + } + + return new SocketChannelDispatcher(bufferPool, syslogEvents, getLogger(), maxConnections, sslContext); } } @@ -468,7 +497,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { /** * Reads messages from a channel until told to stop. */ - public interface ChannelReader extends Runnable { + private interface ChannelReader extends Runnable { void open(int port, int maxBufferSize) throws IOException; @@ -483,7 +512,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for * processing, otherwise the buffer is returned to the buffer pool. */ - public static class DatagramChannelReader implements ChannelReader { + private static class DatagramChannelReader implements ChannelReader { private final BlockingQueue bufferPool; private final BlockingQueue syslogEvents; @@ -586,7 +615,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { * Accepts Socket connections on the given port and creates a handler for each connection to * be executed by a thread pool. */ - public static class SocketChannelReader implements ChannelReader { + private static class SocketChannelDispatcher implements ChannelReader { private final BlockingQueue bufferPool; private final BlockingQueue syslogEvents; @@ -597,13 +626,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final BlockingQueue keyQueue; private final int maxConnections; private final AtomicInteger currentConnections = new AtomicInteger(0); + private final SSLContext sslContext; - public SocketChannelReader(final BlockingQueue bufferPool, final BlockingQueue syslogEvents, final ProcessorLog logger, final int maxConnections) { + public SocketChannelDispatcher(final BlockingQueue bufferPool, final BlockingQueue syslogEvents, + final ProcessorLog logger, final int maxConnections, final SSLContext sslContext) { this.bufferPool = bufferPool; this.syslogEvents = syslogEvents; this.logger = logger; this.maxConnections = maxConnections; this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.sslContext = sslContext; this.executor = Executors.newFixedThreadPool(maxConnections); } @@ -649,21 +681,36 @@ public class ListenSyslog extends AbstractSyslogProcessor { continue; } logger.debug("Accepted incoming connection from {}", - new Object[]{socketChannel.getRemoteAddress().toString()} ); + new Object[]{socketChannel.getRemoteAddress().toString()}); // Set socket to non-blocking, and register with selector socketChannel.configureBlocking(false); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - // Prepare the byte buffer for the reads, clear it out and attach to key + + // Prepare the byte buffer for the reads, clear it out ByteBuffer buffer = bufferPool.poll(); buffer.clear(); buffer.mark(); - readKey.attach(buffer); + + // If we have an SSLContext then create an SSLEngine for the channel + SSLEngine sslEngine = null; + if (sslContext != null) { + sslEngine = sslContext.createSSLEngine(); + } + + // Attach the buffer and SSLEngine to the key + SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslEngine); + readKey.attach(attachment); } else if (key.isReadable()) { // Clear out the operations the select is interested in until done reading key.interestOps(0); - // Create and execute the read handler - final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogEvents, logger); - // and launch the thread + // Create a handler based on whether an SSLEngine was provided or not + final Runnable handler; + if (sslContext != null) { + handler = new SSLSocketChannelHandler(key, this, syslogEvents, logger); + } else { + handler = new SocketChannelHandler(key, this, syslogEvents, logger); + } + // run the handler executor.execute(handler); } } @@ -722,7 +769,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { public void completeConnection(SelectionKey key) { // connection is done. Return the buffer to the pool try { - bufferPool.put((ByteBuffer) key.attachment()); + SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + bufferPool.put(attachment.getByteBuffer()); } catch (InterruptedException e) { // nothing to do here } @@ -740,15 +788,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { * Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for * processing, otherwise the buffer is returned to the buffer pool. */ - public static class SocketChannelHandler implements Runnable { + private static class SocketChannelHandler implements Runnable { private final SelectionKey key; - private final SocketChannelReader dispatcher; + private final SocketChannelDispatcher dispatcher; private final BlockingQueue syslogEvents; private final ProcessorLog logger; private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final BlockingQueue syslogEvents, final ProcessorLog logger) { + public SocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue syslogEvents, final ProcessorLog logger) { this.key = key; this.dispatcher = dispatcher; this.syslogEvents = syslogEvents; @@ -759,12 +807,14 @@ public class ListenSyslog extends AbstractSyslogProcessor { public void run() { boolean eof = false; SocketChannel socketChannel = null; - ByteBuffer socketBuffer = null; try { int bytesRead; socketChannel = (SocketChannel) key.channel(); - socketBuffer = (ByteBuffer) key.attachment(); + + SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + ByteBuffer socketBuffer = attachment.getByteBuffer(); + // read until the buffer is full while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { // prepare byte buffer for reading @@ -809,7 +859,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Treat same as closed socket eof = true; } catch (IOException e) { - logger.error("Error reading from channel", e); + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); // Treat same as closed socket eof = true; } finally { @@ -823,6 +873,81 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + /** + * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. + */ + private static class SSLSocketChannelHandler implements Runnable { + + private final SelectionKey key; + private final SocketChannelDispatcher dispatcher; + private final BlockingQueue syslogEvents; + private final ProcessorLog logger; + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public SSLSocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue syslogEvents, final ProcessorLog logger) { + this.key = key; + this.dispatcher = dispatcher; + this.syslogEvents = syslogEvents; + this.logger = logger; + } + + @Override + public void run() { + boolean eof = false; + SSLSocketChannel sslSocketChannel = null; + try { + int bytesRead; + final SocketChannel socketChannel = (SocketChannel) key.channel(); + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + + // wrap the SocketChannel with an SSLSocketChannel using the SSLEngine from the attachment + sslSocketChannel = new SSLSocketChannel(attachment.getSslEngine(), socketChannel, false); + + // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] + final ByteBuffer socketBuffer = attachment.getByteBuffer(); + byte[] socketBufferArray = new byte[socketBuffer.limit()]; + + // read until no more data + while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { + // go through the buffer looking for the end of each message + for (int i = 0; i < bytesRead; i++) { + final byte currByte = socketBufferArray[i]; + currBytes.write(currByte); + + // check if at end of a message + if (currByte == '\n') { + final String sender = socketChannel.socket().getInetAddress().toString(); + // queue the raw event blocking until space is available, reset the temporary buffer + syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender)); + currBytes.reset(); + } + } + logger.debug("done handling SocketChannel"); + } + + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; + } + } catch (ClosedByInterruptException | InterruptedException e) { + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; + } catch (IOException e) { + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); + // Treat same as closed socket + eof = true; + } finally { + if(eof == true) { + IOUtils.closeQuietly(sslSocketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } + } + } + } + static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) { logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " @@ -830,7 +955,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // Wrapper class to pass around the raw message and the host/ip that sent it - public static class RawSyslogEvent { + static class RawSyslogEvent { final byte[] rawMessage; final String sender; @@ -850,4 +975,25 @@ public class ListenSyslog extends AbstractSyslogProcessor { } + // Wrapper class so we can attach a buffer and/or an SSLEngine to the selector key + private static class SocketChannelAttachment { + + private final ByteBuffer byteBuffer; + private final SSLEngine sslEngine; + + public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine sslEngine) { + this.byteBuffer = byteBuffer; + this.sslEngine = sslEngine; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public SSLEngine getSslEngine() { + return sslEngine; + } + + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 9cb65081f9..20899562e4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -25,6 +25,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -34,9 +36,12 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; +import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -45,6 +50,7 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -135,6 +141,13 @@ public class PutSyslog extends AbstractSyslogProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " + + "messages will be sent over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -161,6 +174,7 @@ public class PutSyslog extends AbstractSyslogProcessor { descriptors.add(HOSTNAME); descriptors.add(PROTOCOL); descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(IDLE_EXPIRATION); descriptors.add(SEND_BUFFER_SIZE); descriptors.add(BATCH_SIZE); @@ -189,6 +203,22 @@ public class PutSyslog extends AbstractSyslogProcessor { return descriptors; } + @Override + protected Collection customValidate(final ValidationContext context) { + final Collection results = new ArrayList<>(); + + final String protocol = context.getProperty(PROTOCOL).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) { + results.add(new ValidationResult.Builder() + .explanation("SSL can not be used with UDP") + .valid(false).subject("SSL Context").build()); + } + + return results; + } + @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); @@ -197,11 +227,8 @@ public class PutSyslog extends AbstractSyslogProcessor { this.bufferPool.offer(ByteBuffer.allocate(bufferSize)); } - // create a pool of senders based on the number of concurrent tasks for this processor + // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); - for (int i=0; i < context.getMaxConcurrentTasks(); i++) { - senderPool.offer(createSender(context, bufferPool)); - } } protected ChannelSender createSender(final ProcessContext context, final BlockingQueue bufferPool) throws IOException { @@ -209,25 +236,35 @@ public class PutSyslog extends AbstractSyslogProcessor { final String host = context.getProperty(HOSTNAME).getValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); - return createSender(protocol, host, port, Charset.forName(charSet), bufferPool); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool); } // visible for testing to override and provide a mock sender if desired - protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue bufferPool) + protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port, + final Charset charset, final BlockingQueue bufferPool) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { return new DatagramChannelSender(host, port, bufferPool, charset); } else { - return new SocketChannelSender(host, port, bufferPool, charset); + // if an SSLContextService is provided then we make a secure sender + if (sslContextService != null) { + final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset); + } else { + return new SocketChannelSender(host, port, bufferPool, charset); + } } } @OnStopped public void onStopped() { - ChannelSender sender = senderPool.poll(); - while (sender != null) { - sender.close(); - sender = senderPool.poll(); + if (senderPool != null) { + ChannelSender sender = senderPool.poll(); + while (sender != null) { + sender.close(); + sender = senderPool.poll(); + } } } @@ -362,7 +399,7 @@ public class PutSyslog extends AbstractSyslogProcessor { /** * Base class for sending messages over a channel. */ - public static abstract class ChannelSender { + protected static abstract class ChannelSender { final int port; final String host; @@ -418,7 +455,7 @@ public class PutSyslog extends AbstractSyslogProcessor { /** * Sends messages over a DatagramChannel. */ - static class DatagramChannelSender extends ChannelSender { + private static class DatagramChannelSender extends ChannelSender { final DatagramChannel channel; @@ -449,7 +486,7 @@ public class PutSyslog extends AbstractSyslogProcessor { /** * Sends messages over a SocketChannel. */ - static class SocketChannelSender extends ChannelSender { + private static class SocketChannelSender extends ChannelSender { final SocketChannel channel; @@ -477,4 +514,39 @@ public class PutSyslog extends AbstractSyslogProcessor { } } + /** + * Sends messages over an SSLSocketChannel. + */ + private static class SSLSocketChannelSender extends ChannelSender { + + final SSLSocketChannel channel; + + SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue bufferPool, final Charset charset) throws IOException { + super(host, port, bufferPool, charset); + this.channel = new SSLSocketChannel(sslContext, host, port, true); + this.channel.connect(); + } + + @Override + public void send(final String message) throws IOException { + final byte[] bytes = message.getBytes(charset); + channel.write(bytes); + lastUsed = System.currentTimeMillis(); + } + + @Override + public void write(ByteBuffer buffer) throws IOException { + // nothing to do here since we are overriding send() above + } + + @Override + boolean isConnected() { + return channel != null && !channel.isClosed(); + } + + @Override + public void close() { + IOUtils.closeQuietly(channel); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 8fdca7f6c8..ff147d7974 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -164,6 +164,57 @@ public class TestListenSyslog { } } + @Test + public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 3; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // send 3 messages as 1 + final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE; + final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + @Test public void testTCPMultipleConnection() throws IOException, InterruptedException { final ListenSyslog proc = new ListenSyslog(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java index eb0d3f4a5f..c96d105450 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -326,7 +327,8 @@ public class TestPutSyslog { } @Override - protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue bufferPool) throws IOException { + protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port, + Charset charset, BlockingQueue bufferPool) throws IOException { return mockSender; } } @@ -344,7 +346,8 @@ public class TestPutSyslog { } @Override - protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue bufferPool) throws IOException { + protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port, + Charset charset, BlockingQueue bufferPool) throws IOException { if (numSendersCreated >= numSendersAllowed) { throw new IOException("too many senders"); }