This commit is contained in:
Mark Payne 2015-12-03 12:13:33 -05:00
commit dae25accb7
5 changed files with 345 additions and 69 deletions

View File

@ -85,6 +85,10 @@ public class SSLSocketChannel implements Closeable {
}
public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
this(sslContext.createSSLEngine(), socketChannel, client);
}
public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel, final boolean client) throws IOException {
if (!socketChannel.isConnected()) {
throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
}
@ -96,9 +100,9 @@ public class SSLSocketChannel implements Closeable {
this.hostname = socket.getInetAddress().getHostName();
this.port = socket.getPort();
this.engine = sslContext.createSSLEngine();
this.engine = sslEngine;
this.engine.setUseClientMode(client);
engine.setNeedClientAuth(true);
this.engine.setNeedClientAuth(true);
streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));

View File

@ -16,6 +16,38 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@ -46,34 +78,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@ -129,8 +133,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
"The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
@ -151,6 +155,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("true")
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -176,6 +187,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROTOCOL);
descriptors.add(PORT);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(MAX_CONNECTIONS);
@ -222,6 +234,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false)
.explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build());
}
final String protocol = validationContext.getProperty(PROTOCOL).getValue();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
results.add(new ValidationResult.Builder()
.explanation("SSL can not be used with UDP")
.valid(false).subject("SSL Context").build());
}
return results;
}
@ -251,7 +273,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
parser = new SyslogParser(Charset.forName(charSet));
// create either a UDP or TCP reader and call open() to bind to the given port
channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@ -267,12 +290,18 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// visible for testing to be overridden and provide a mock ChannelReader if desired
protected ChannelReader createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
int maxConnections)
int maxConnections, final SSLContextService sslContextService)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogEvents, getLogger());
} else {
return new SocketChannelReader(bufferPool, syslogEvents, getLogger(), maxConnections);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
return new SocketChannelDispatcher(bufferPool, syslogEvents, getLogger(), maxConnections, sslContext);
}
}
@ -468,7 +497,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
/**
* Reads messages from a channel until told to stop.
*/
public interface ChannelReader extends Runnable {
private interface ChannelReader extends Runnable {
void open(int port, int maxBufferSize) throws IOException;
@ -483,7 +512,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
* Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for
* processing, otherwise the buffer is returned to the buffer pool.
*/
public static class DatagramChannelReader implements ChannelReader {
private static class DatagramChannelReader implements ChannelReader {
private final BlockingQueue<ByteBuffer> bufferPool;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
@ -586,7 +615,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
* Accepts Socket connections on the given port and creates a handler for each connection to
* be executed by a thread pool.
*/
public static class SocketChannelReader implements ChannelReader {
private static class SocketChannelDispatcher implements ChannelReader {
private final BlockingQueue<ByteBuffer> bufferPool;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
@ -597,13 +626,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final BlockingQueue<SelectionKey> keyQueue;
private final int maxConnections;
private final AtomicInteger currentConnections = new AtomicInteger(0);
private final SSLContext sslContext;
public SocketChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger, final int maxConnections) {
public SocketChannelDispatcher(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
final ProcessorLog logger, final int maxConnections, final SSLContext sslContext) {
this.bufferPool = bufferPool;
this.syslogEvents = syslogEvents;
this.logger = logger;
this.maxConnections = maxConnections;
this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
this.sslContext = sslContext;
this.executor = Executors.newFixedThreadPool(maxConnections);
}
@ -649,21 +681,36 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
logger.debug("Accepted incoming connection from {}",
new Object[]{socketChannel.getRemoteAddress().toString()} );
new Object[]{socketChannel.getRemoteAddress().toString()});
// Set socket to non-blocking, and register with selector
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
// Prepare the byte buffer for the reads, clear it out and attach to key
// Prepare the byte buffer for the reads, clear it out
ByteBuffer buffer = bufferPool.poll();
buffer.clear();
buffer.mark();
readKey.attach(buffer);
// If we have an SSLContext then create an SSLEngine for the channel
SSLEngine sslEngine = null;
if (sslContext != null) {
sslEngine = sslContext.createSSLEngine();
}
// Attach the buffer and SSLEngine to the key
SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslEngine);
readKey.attach(attachment);
} else if (key.isReadable()) {
// Clear out the operations the select is interested in until done reading
key.interestOps(0);
// Create and execute the read handler
final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogEvents, logger);
// and launch the thread
// Create a handler based on whether an SSLEngine was provided or not
final Runnable handler;
if (sslContext != null) {
handler = new SSLSocketChannelHandler(key, this, syslogEvents, logger);
} else {
handler = new SocketChannelHandler(key, this, syslogEvents, logger);
}
// run the handler
executor.execute(handler);
}
}
@ -722,7 +769,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public void completeConnection(SelectionKey key) {
// connection is done. Return the buffer to the pool
try {
bufferPool.put((ByteBuffer) key.attachment());
SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
bufferPool.put(attachment.getByteBuffer());
} catch (InterruptedException e) {
// nothing to do here
}
@ -740,15 +788,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
* Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for
* processing, otherwise the buffer is returned to the buffer pool.
*/
public static class SocketChannelHandler implements Runnable {
private static class SocketChannelHandler implements Runnable {
private final SelectionKey key;
private final SocketChannelReader dispatcher;
private final SocketChannelDispatcher dispatcher;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
public SocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
this.key = key;
this.dispatcher = dispatcher;
this.syslogEvents = syslogEvents;
@ -759,12 +807,14 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public void run() {
boolean eof = false;
SocketChannel socketChannel = null;
ByteBuffer socketBuffer = null;
try {
int bytesRead;
socketChannel = (SocketChannel) key.channel();
socketBuffer = (ByteBuffer) key.attachment();
SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
ByteBuffer socketBuffer = attachment.getByteBuffer();
// read until the buffer is full
while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
// prepare byte buffer for reading
@ -809,7 +859,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
// Treat same as closed socket
eof = true;
} finally {
@ -823,6 +873,81 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
/**
* Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS.
*/
private static class SSLSocketChannelHandler implements Runnable {
private final SelectionKey key;
private final SocketChannelDispatcher dispatcher;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public SSLSocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
this.key = key;
this.dispatcher = dispatcher;
this.syslogEvents = syslogEvents;
this.logger = logger;
}
@Override
public void run() {
boolean eof = false;
SSLSocketChannel sslSocketChannel = null;
try {
int bytesRead;
final SocketChannel socketChannel = (SocketChannel) key.channel();
final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment();
// wrap the SocketChannel with an SSLSocketChannel using the SSLEngine from the attachment
sslSocketChannel = new SSLSocketChannel(attachment.getSslEngine(), socketChannel, false);
// SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[]
final ByteBuffer socketBuffer = attachment.getByteBuffer();
byte[] socketBufferArray = new byte[socketBuffer.limit()];
// read until no more data
while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) {
// go through the buffer looking for the end of each message
for (int i = 0; i < bytesRead; i++) {
final byte currByte = socketBufferArray[i];
currBytes.write(currByte);
// check if at end of a message
if (currByte == '\n') {
final String sender = socketChannel.socket().getInetAddress().toString();
// queue the raw event blocking until space is available, reset the temporary buffer
syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender));
currBytes.reset();
}
}
logger.debug("done handling SocketChannel");
}
// Check for closed socket
if( bytesRead < 0 ){
eof = true;
}
} catch (ClosedByInterruptException | InterruptedException e) {
logger.debug("read loop interrupted, closing connection");
// Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e);
// Treat same as closed socket
eof = true;
} finally {
if(eof == true) {
IOUtils.closeQuietly(sslSocketChannel);
dispatcher.completeConnection(key);
} else {
dispatcher.addBackForSelection(key);
}
}
}
}
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
@ -830,7 +955,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// Wrapper class to pass around the raw message and the host/ip that sent it
public static class RawSyslogEvent {
static class RawSyslogEvent {
final byte[] rawMessage;
final String sender;
@ -850,4 +975,25 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// Wrapper class so we can attach a buffer and/or an SSLEngine to the selector key
private static class SocketChannelAttachment {
private final ByteBuffer byteBuffer;
private final SSLEngine sslEngine;
public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine sslEngine) {
this.byteBuffer = byteBuffer;
this.sslEngine = sslEngine;
}
public ByteBuffer getByteBuffer() {
return byteBuffer;
}
public SSLEngine getSslEngine() {
return sslEngine;
}
}
}

View File

@ -25,6 +25,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -34,9 +36,12 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -45,6 +50,7 @@ import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -135,6 +141,13 @@ public class PutSyslog extends AbstractSyslogProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
"messages will be sent over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -161,6 +174,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
descriptors.add(HOSTNAME);
descriptors.add(PROTOCOL);
descriptors.add(PORT);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(IDLE_EXPIRATION);
descriptors.add(SEND_BUFFER_SIZE);
descriptors.add(BATCH_SIZE);
@ -189,6 +203,22 @@ public class PutSyslog extends AbstractSyslogProcessor {
return descriptors;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String protocol = context.getProperty(PROTOCOL).getValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
results.add(new ValidationResult.Builder()
.explanation("SSL can not be used with UDP")
.valid(false).subject("SSL Context").build());
}
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
@ -197,11 +227,8 @@ public class PutSyslog extends AbstractSyslogProcessor {
this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
// create a pool of senders based on the number of concurrent tasks for this processor
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
senderPool.offer(createSender(context, bufferPool));
}
}
protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
@ -209,25 +236,35 @@ public class PutSyslog extends AbstractSyslogProcessor {
final String host = context.getProperty(HOSTNAME).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
return createSender(protocol, host, port, Charset.forName(charSet), bufferPool);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool);
}
// visible for testing to override and provide a mock sender if desired
protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port,
final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelSender(host, port, bufferPool, charset);
} else {
return new SocketChannelSender(host, port, bufferPool, charset);
// if an SSLContextService is provided then we make a secure sender
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset);
} else {
return new SocketChannelSender(host, port, bufferPool, charset);
}
}
}
@OnStopped
public void onStopped() {
ChannelSender sender = senderPool.poll();
while (sender != null) {
sender.close();
sender = senderPool.poll();
if (senderPool != null) {
ChannelSender sender = senderPool.poll();
while (sender != null) {
sender.close();
sender = senderPool.poll();
}
}
}
@ -362,7 +399,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
/**
* Base class for sending messages over a channel.
*/
public static abstract class ChannelSender {
protected static abstract class ChannelSender {
final int port;
final String host;
@ -418,7 +455,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
/**
* Sends messages over a DatagramChannel.
*/
static class DatagramChannelSender extends ChannelSender {
private static class DatagramChannelSender extends ChannelSender {
final DatagramChannel channel;
@ -449,7 +486,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
/**
* Sends messages over a SocketChannel.
*/
static class SocketChannelSender extends ChannelSender {
private static class SocketChannelSender extends ChannelSender {
final SocketChannel channel;
@ -477,4 +514,39 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
}
/**
* Sends messages over an SSLSocketChannel.
*/
private static class SSLSocketChannelSender extends ChannelSender {
final SSLSocketChannel channel;
SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
super(host, port, bufferPool, charset);
this.channel = new SSLSocketChannel(sslContext, host, port, true);
this.channel.connect();
}
@Override
public void send(final String message) throws IOException {
final byte[] bytes = message.getBytes(charset);
channel.write(bytes);
lastUsed = System.currentTimeMillis();
}
@Override
public void write(ByteBuffer buffer) throws IOException {
// nothing to do here since we are overriding send() above
}
@Override
boolean isConnected() {
return channel != null && !channel.isClosed();
}
@Override
public void close() {
IOUtils.closeQuietly(channel);
}
}
}

View File

@ -164,6 +164,57 @@ public class TestListenSyslog {
}
}
@Test
public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 3;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// send 3 messages as 1
final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE;
final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
sender.setDaemon(true);
sender.start();
// call onTrigger until we read all messages, or 30 seconds passed
try {
int numTransfered = 0;
long timeout = System.currentTimeMillis() + 30000;
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
Thread.sleep(10);
proc.onTrigger(context, processSessionFactory);
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
}
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testTCPMultipleConnection() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -326,7 +327,8 @@ public class TestPutSyslog {
}
@Override
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
return mockSender;
}
}
@ -344,7 +346,8 @@ public class TestPutSyslog {
}
@Override
protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
if (numSendersCreated >= numSendersAllowed) {
throw new IOException("too many senders");
}