mirror of https://github.com/apache/nifi.git
NIFI-1227 Adding SSL/TLS support to Syslog processors
-Removing connection creation from PutSyslog onScheduled, and reusing constructor in SSLSocketChannel -Reducing visiblity of inner classes
This commit is contained in:
parent
eceb1d20c7
commit
4653908b22
|
@ -85,6 +85,10 @@ public class SSLSocketChannel implements Closeable {
|
|||
}
|
||||
|
||||
public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
|
||||
this(sslContext.createSSLEngine(), socketChannel, client);
|
||||
}
|
||||
|
||||
public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel, final boolean client) throws IOException {
|
||||
if (!socketChannel.isConnected()) {
|
||||
throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
|
||||
}
|
||||
|
@ -96,9 +100,9 @@ public class SSLSocketChannel implements Closeable {
|
|||
this.hostname = socket.getInetAddress().getHostName();
|
||||
this.port = socket.getPort();
|
||||
|
||||
this.engine = sslContext.createSSLEngine();
|
||||
this.engine = sslEngine;
|
||||
this.engine.setUseClientMode(client);
|
||||
engine.setNeedClientAuth(true);
|
||||
this.engine.setNeedClientAuth(true);
|
||||
|
||||
streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
|
||||
streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
|
||||
|
|
|
@ -16,6 +16,38 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.SyslogEvent;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -46,34 +78,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.SyslogEvent;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||
|
||||
@SupportsBatching
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
|
||||
|
@ -151,6 +155,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
|
||||
"messages will be received over a secure connection.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
|
@ -176,6 +187,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(PROTOCOL);
|
||||
descriptors.add(PORT);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(RECV_BUFFER_SIZE);
|
||||
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
|
||||
descriptors.add(MAX_CONNECTIONS);
|
||||
|
@ -222,6 +234,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false)
|
||||
.explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build());
|
||||
}
|
||||
|
||||
final String protocol = validationContext.getProperty(PROTOCOL).getValue();
|
||||
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
|
||||
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.explanation("SSL can not be used with UDP")
|
||||
.valid(false).subject("SSL Context").build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -251,7 +273,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
parser = new SyslogParser(Charset.forName(charSet));
|
||||
|
||||
// create either a UDP or TCP reader and call open() to bind to the given port
|
||||
channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections);
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService);
|
||||
channelReader.open(port, maxChannelBufferSize);
|
||||
|
||||
final Thread readerThread = new Thread(channelReader);
|
||||
|
@ -267,12 +290,18 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
|
||||
// visible for testing to be overridden and provide a mock ChannelReader if desired
|
||||
protected ChannelReader createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
|
||||
int maxConnections)
|
||||
int maxConnections, final SSLContextService sslContextService)
|
||||
throws IOException {
|
||||
if (protocol.equals(UDP_VALUE.getValue())) {
|
||||
return new DatagramChannelReader(bufferPool, syslogEvents, getLogger());
|
||||
} else {
|
||||
return new SocketChannelReader(bufferPool, syslogEvents, getLogger(), maxConnections);
|
||||
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
|
||||
SSLContext sslContext = null;
|
||||
if (sslContextService != null) {
|
||||
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
|
||||
}
|
||||
|
||||
return new SocketChannelDispatcher(bufferPool, syslogEvents, getLogger(), maxConnections, sslContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -468,7 +497,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
/**
|
||||
* Reads messages from a channel until told to stop.
|
||||
*/
|
||||
public interface ChannelReader extends Runnable {
|
||||
private interface ChannelReader extends Runnable {
|
||||
|
||||
void open(int port, int maxBufferSize) throws IOException;
|
||||
|
||||
|
@ -483,7 +512,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
* Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for
|
||||
* processing, otherwise the buffer is returned to the buffer pool.
|
||||
*/
|
||||
public static class DatagramChannelReader implements ChannelReader {
|
||||
private static class DatagramChannelReader implements ChannelReader {
|
||||
|
||||
private final BlockingQueue<ByteBuffer> bufferPool;
|
||||
private final BlockingQueue<RawSyslogEvent> syslogEvents;
|
||||
|
@ -586,7 +615,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
* Accepts Socket connections on the given port and creates a handler for each connection to
|
||||
* be executed by a thread pool.
|
||||
*/
|
||||
public static class SocketChannelReader implements ChannelReader {
|
||||
private static class SocketChannelDispatcher implements ChannelReader {
|
||||
|
||||
private final BlockingQueue<ByteBuffer> bufferPool;
|
||||
private final BlockingQueue<RawSyslogEvent> syslogEvents;
|
||||
|
@ -597,13 +626,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
private final BlockingQueue<SelectionKey> keyQueue;
|
||||
private final int maxConnections;
|
||||
private final AtomicInteger currentConnections = new AtomicInteger(0);
|
||||
private final SSLContext sslContext;
|
||||
|
||||
public SocketChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger, final int maxConnections) {
|
||||
public SocketChannelDispatcher(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
|
||||
final ProcessorLog logger, final int maxConnections, final SSLContext sslContext) {
|
||||
this.bufferPool = bufferPool;
|
||||
this.syslogEvents = syslogEvents;
|
||||
this.logger = logger;
|
||||
this.maxConnections = maxConnections;
|
||||
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
|
||||
this.sslContext = sslContext;
|
||||
this.executor = Executors.newFixedThreadPool(maxConnections);
|
||||
}
|
||||
|
||||
|
@ -653,17 +685,32 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
// Set socket to non-blocking, and register with selector
|
||||
socketChannel.configureBlocking(false);
|
||||
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
|
||||
// Prepare the byte buffer for the reads, clear it out and attach to key
|
||||
|
||||
// Prepare the byte buffer for the reads, clear it out
|
||||
ByteBuffer buffer = bufferPool.poll();
|
||||
buffer.clear();
|
||||
buffer.mark();
|
||||
readKey.attach(buffer);
|
||||
|
||||
// If we have an SSLContext then create an SSLEngine for the channel
|
||||
SSLEngine sslEngine = null;
|
||||
if (sslContext != null) {
|
||||
sslEngine = sslContext.createSSLEngine();
|
||||
}
|
||||
|
||||
// Attach the buffer and SSLEngine to the key
|
||||
SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslEngine);
|
||||
readKey.attach(attachment);
|
||||
} else if (key.isReadable()) {
|
||||
// Clear out the operations the select is interested in until done reading
|
||||
key.interestOps(0);
|
||||
// Create and execute the read handler
|
||||
final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogEvents, logger);
|
||||
// and launch the thread
|
||||
// Create a handler based on whether an SSLEngine was provided or not
|
||||
final Runnable handler;
|
||||
if (sslContext != null) {
|
||||
handler = new SSLSocketChannelHandler(key, this, syslogEvents, logger);
|
||||
} else {
|
||||
handler = new SocketChannelHandler(key, this, syslogEvents, logger);
|
||||
}
|
||||
// run the handler
|
||||
executor.execute(handler);
|
||||
}
|
||||
}
|
||||
|
@ -722,7 +769,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
public void completeConnection(SelectionKey key) {
|
||||
// connection is done. Return the buffer to the pool
|
||||
try {
|
||||
bufferPool.put((ByteBuffer) key.attachment());
|
||||
SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
|
||||
bufferPool.put(attachment.getByteBuffer());
|
||||
} catch (InterruptedException e) {
|
||||
// nothing to do here
|
||||
}
|
||||
|
@ -740,15 +788,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
* Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for
|
||||
* processing, otherwise the buffer is returned to the buffer pool.
|
||||
*/
|
||||
public static class SocketChannelHandler implements Runnable {
|
||||
private static class SocketChannelHandler implements Runnable {
|
||||
|
||||
private final SelectionKey key;
|
||||
private final SocketChannelReader dispatcher;
|
||||
private final SocketChannelDispatcher dispatcher;
|
||||
private final BlockingQueue<RawSyslogEvent> syslogEvents;
|
||||
private final ProcessorLog logger;
|
||||
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
|
||||
|
||||
public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
|
||||
public SocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
|
||||
this.key = key;
|
||||
this.dispatcher = dispatcher;
|
||||
this.syslogEvents = syslogEvents;
|
||||
|
@ -759,12 +807,14 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
public void run() {
|
||||
boolean eof = false;
|
||||
SocketChannel socketChannel = null;
|
||||
ByteBuffer socketBuffer = null;
|
||||
|
||||
try {
|
||||
int bytesRead;
|
||||
socketChannel = (SocketChannel) key.channel();
|
||||
socketBuffer = (ByteBuffer) key.attachment();
|
||||
|
||||
SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
|
||||
ByteBuffer socketBuffer = attachment.getByteBuffer();
|
||||
|
||||
// read until the buffer is full
|
||||
while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
|
||||
// prepare byte buffer for reading
|
||||
|
@ -809,7 +859,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
// Treat same as closed socket
|
||||
eof = true;
|
||||
} catch (IOException e) {
|
||||
logger.error("Error reading from channel", e);
|
||||
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
|
||||
// Treat same as closed socket
|
||||
eof = true;
|
||||
} finally {
|
||||
|
@ -823,6 +873,81 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS.
|
||||
*/
|
||||
private static class SSLSocketChannelHandler implements Runnable {
|
||||
|
||||
private final SelectionKey key;
|
||||
private final SocketChannelDispatcher dispatcher;
|
||||
private final BlockingQueue<RawSyslogEvent> syslogEvents;
|
||||
private final ProcessorLog logger;
|
||||
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
|
||||
|
||||
public SSLSocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
|
||||
this.key = key;
|
||||
this.dispatcher = dispatcher;
|
||||
this.syslogEvents = syslogEvents;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
boolean eof = false;
|
||||
SSLSocketChannel sslSocketChannel = null;
|
||||
try {
|
||||
int bytesRead;
|
||||
final SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
|
||||
|
||||
// wrap the SocketChannel with an SSLSocketChannel using the SSLEngine from the attachment
|
||||
sslSocketChannel = new SSLSocketChannel(attachment.getSslEngine(), socketChannel, false);
|
||||
|
||||
// SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[]
|
||||
final ByteBuffer socketBuffer = attachment.getByteBuffer();
|
||||
byte[] socketBufferArray = new byte[socketBuffer.limit()];
|
||||
|
||||
// read until no more data
|
||||
while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) {
|
||||
// go through the buffer looking for the end of each message
|
||||
for (int i = 0; i < bytesRead; i++) {
|
||||
final byte currByte = socketBufferArray[i];
|
||||
currBytes.write(currByte);
|
||||
|
||||
// check if at end of a message
|
||||
if (currByte == '\n') {
|
||||
final String sender = socketChannel.socket().getInetAddress().toString();
|
||||
// queue the raw event blocking until space is available, reset the temporary buffer
|
||||
syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender));
|
||||
currBytes.reset();
|
||||
}
|
||||
}
|
||||
logger.debug("done handling SocketChannel");
|
||||
}
|
||||
|
||||
// Check for closed socket
|
||||
if( bytesRead < 0 ){
|
||||
eof = true;
|
||||
}
|
||||
} catch (ClosedByInterruptException | InterruptedException e) {
|
||||
logger.debug("read loop interrupted, closing connection");
|
||||
// Treat same as closed socket
|
||||
eof = true;
|
||||
} catch (IOException e) {
|
||||
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
|
||||
// Treat same as closed socket
|
||||
eof = true;
|
||||
} finally {
|
||||
if(eof == true) {
|
||||
IOUtils.closeQuietly(sslSocketChannel);
|
||||
dispatcher.completeConnection(key);
|
||||
} else {
|
||||
dispatcher.addBackForSelection(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
|
||||
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
|
||||
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
|
||||
|
@ -830,7 +955,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
}
|
||||
|
||||
// Wrapper class to pass around the raw message and the host/ip that sent it
|
||||
public static class RawSyslogEvent {
|
||||
static class RawSyslogEvent {
|
||||
|
||||
final byte[] rawMessage;
|
||||
final String sender;
|
||||
|
@ -850,4 +975,25 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
|
||||
}
|
||||
|
||||
// Wrapper class so we can attach a buffer and/or an SSLEngine to the selector key
|
||||
private static class SocketChannelAttachment {
|
||||
|
||||
private final ByteBuffer byteBuffer;
|
||||
private final SSLEngine sslEngine;
|
||||
|
||||
public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine sslEngine) {
|
||||
this.byteBuffer = byteBuffer;
|
||||
this.sslEngine = sslEngine;
|
||||
}
|
||||
|
||||
public ByteBuffer getByteBuffer() {
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
public SSLEngine getSslEngine() {
|
||||
return sslEngine;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
@ -34,9 +36,12 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.SyslogParser;
|
||||
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -45,6 +50,7 @@ import java.nio.channels.DatagramChannel;
|
|||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -135,6 +141,13 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
|
||||
"messages will be sent over a secure connection.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
|
@ -161,6 +174,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
descriptors.add(HOSTNAME);
|
||||
descriptors.add(PROTOCOL);
|
||||
descriptors.add(PORT);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(IDLE_EXPIRATION);
|
||||
descriptors.add(SEND_BUFFER_SIZE);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
|
@ -189,6 +203,22 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
return descriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
|
||||
final Collection<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
|
||||
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.explanation("SSL can not be used with UDP")
|
||||
.valid(false).subject("SSL Context").build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws IOException {
|
||||
final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
|
@ -197,11 +227,8 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
|
||||
}
|
||||
|
||||
// create a pool of senders based on the number of concurrent tasks for this processor
|
||||
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
|
||||
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
|
||||
for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
|
||||
senderPool.offer(createSender(context, bufferPool));
|
||||
}
|
||||
}
|
||||
|
||||
protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
|
@ -209,27 +236,37 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
final String host = context.getProperty(HOSTNAME).getValue();
|
||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||
final String charSet = context.getProperty(CHARSET).getValue();
|
||||
return createSender(protocol, host, port, Charset.forName(charSet), bufferPool);
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool);
|
||||
}
|
||||
|
||||
// visible for testing to override and provide a mock sender if desired
|
||||
protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
|
||||
protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port,
|
||||
final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
|
||||
throws IOException {
|
||||
if (protocol.equals(UDP_VALUE.getValue())) {
|
||||
return new DatagramChannelSender(host, port, bufferPool, charset);
|
||||
} else {
|
||||
// if an SSLContextService is provided then we make a secure sender
|
||||
if (sslContextService != null) {
|
||||
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
|
||||
return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset);
|
||||
} else {
|
||||
return new SocketChannelSender(host, port, bufferPool, charset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
if (senderPool != null) {
|
||||
ChannelSender sender = senderPool.poll();
|
||||
while (sender != null) {
|
||||
sender.close();
|
||||
sender = senderPool.poll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void pruneIdleSenders(final long idleThreshold){
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
@ -362,7 +399,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
/**
|
||||
* Base class for sending messages over a channel.
|
||||
*/
|
||||
public static abstract class ChannelSender {
|
||||
protected static abstract class ChannelSender {
|
||||
|
||||
final int port;
|
||||
final String host;
|
||||
|
@ -418,7 +455,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
/**
|
||||
* Sends messages over a DatagramChannel.
|
||||
*/
|
||||
static class DatagramChannelSender extends ChannelSender {
|
||||
private static class DatagramChannelSender extends ChannelSender {
|
||||
|
||||
final DatagramChannel channel;
|
||||
|
||||
|
@ -449,7 +486,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
/**
|
||||
* Sends messages over a SocketChannel.
|
||||
*/
|
||||
static class SocketChannelSender extends ChannelSender {
|
||||
private static class SocketChannelSender extends ChannelSender {
|
||||
|
||||
final SocketChannel channel;
|
||||
|
||||
|
@ -477,4 +514,39 @@ public class PutSyslog extends AbstractSyslogProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends messages over an SSLSocketChannel.
|
||||
*/
|
||||
private static class SSLSocketChannelSender extends ChannelSender {
|
||||
|
||||
final SSLSocketChannel channel;
|
||||
|
||||
SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
|
||||
super(host, port, bufferPool, charset);
|
||||
this.channel = new SSLSocketChannel(sslContext, host, port, true);
|
||||
this.channel.connect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(final String message) throws IOException {
|
||||
final byte[] bytes = message.getBytes(charset);
|
||||
channel.write(bytes);
|
||||
lastUsed = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buffer) throws IOException {
|
||||
// nothing to do here since we are overriding send() above
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isConnected() {
|
||||
return channel != null && !channel.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,6 +164,57 @@ public class TestListenSyslog {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
|
||||
runner.setProperty(ListenSyslog.PORT, "0");
|
||||
|
||||
// schedule to start listening on a random port
|
||||
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
|
||||
final ProcessContext context = runner.getProcessContext();
|
||||
proc.onScheduled(context);
|
||||
|
||||
final int numMessages = 3;
|
||||
final int port = proc.getPort();
|
||||
Assert.assertTrue(port > 0);
|
||||
|
||||
// send 3 messages as 1
|
||||
final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE;
|
||||
final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
|
||||
sender.setDaemon(true);
|
||||
sender.start();
|
||||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
|
||||
|
||||
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
|
||||
Assert.assertNotNull(events);
|
||||
Assert.assertEquals(numMessages, events.size());
|
||||
|
||||
final ProvenanceEventRecord event = events.get(0);
|
||||
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
|
||||
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
|
||||
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
proc.onUnscheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTCPMultipleConnection() throws IOException, InterruptedException {
|
||||
final ListenSyslog proc = new ListenSyslog();
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
|
|||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
|
@ -326,7 +327,8 @@ public class TestPutSyslog {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
|
||||
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
return mockSender;
|
||||
}
|
||||
}
|
||||
|
@ -344,7 +346,8 @@ public class TestPutSyslog {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
|
||||
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
|
||||
if (numSendersCreated >= numSendersAllowed) {
|
||||
throw new IOException("too many senders");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue