From 9c542432da7c9df6087a180cd939820dcce7008d Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 13 Oct 2015 09:48:19 -0400 Subject: [PATCH] NIFI-274 Adding ListenSyslog and PutSyslog to standard processors. - Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor. - Refactoring some of the TCP handling so it keeps reading from a connection until the client closes it - Adding an error queue - Adding a sender field on the syslog event to record the system that sent the message --- .../standard/AbstractSyslogProcessor.java | 81 +++ .../processors/standard/ListenSyslog.java | 527 ++++++++++++++++++ .../nifi/processors/standard/PutSyslog.java | 460 +++++++++++++++ .../processors/standard/util/SyslogEvent.java | 180 ++++++ .../standard/util/SyslogParser.java | 165 ++++++ .../org.apache.nifi.processor.Processor | 2 + .../processors/standard/TestListenSyslog.java | 426 ++++++++++++++ .../processors/standard/TestPutSyslog.java | 398 +++++++++++++ .../standard/util/TestSyslogParser.java | 253 +++++++++ nifi-nar-bundles/pom.xml | 4 +- 10 files changed, 2494 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java new file mode 100644 index 0000000000..f7d5eeb278 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Base class for Syslog processors. + */ +public abstract class AbstractSyslogProcessor extends AbstractProcessor { + + public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP"); + public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP"); + + public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor + .Builder().name("Protocol") + .description("The protocol for Syslog communication, either TCP or UDP.") + .required(true) + .allowableValues(TCP_VALUE, UDP_VALUE) + .defaultValue(UDP_VALUE.getValue()) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port for Syslog communication.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies which character set of the Syslog messages") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + + /** + * FlowFile Attributes for each Syslog message. + */ + public enum SyslogAttributes implements FlowFileAttributeKey { + PRIORITY("syslog.priority"), + SEVERITY("syslog.severity"), + FACILITY("syslog.facility"), + VERSION("syslog.version"), + TIMESTAMP("syslog.timestamp"), + HOSTNAME("syslog.hostname"), + SENDER("syslog.sender"), + BODY("syslog.body"), + VALID("syslog.valid"); + + private String key; + + SyslogAttributes(String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java new file mode 100644 index 0000000000..9f57c9f063 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +@Tags({"syslog", "listen", "udp", "tcp", "logs"}) +@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + + "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: ()(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " + + "where version is optional. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " + + "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches one of these patterns, the message will be " + + "parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. If an incoming " + + "message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message " + + "in the content of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the " + + "invalid relationship.") +@WritesAttributes({ @WritesAttribute(attribute="syslog.priority", description="The priority of the Syslog message."), + @WritesAttribute(attribute="syslog.severity", description="The severity of the Syslog message derived from the priority."), + @WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."), + @WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."), + @WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."), + @WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."), + @WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."), + @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."), + @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " + + "If this value is false, the other attributes will be empty and only the original message will be available in the content."), + @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")}) +public class ListenSyslog extends AbstractSyslogProcessor { + + public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Receive Buffer Size") + .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " + + "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " + + "from an incoming connection until the buffer is full, or the connection is closed. ") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 KB") + .required(true) + .build(); + public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Buffer") + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.") + .build(); + + private Set relationships; + private List descriptors; + + private volatile BufferPool bufferPool; + private volatile ChannelReader channelReader; + private volatile SyslogParser parser; + private volatile BlockingQueue syslogEvents; + private volatile BlockingQueue errorEvents; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(PROTOCOL); + descriptors.add(PORT); + descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_SOCKET_BUFFER_SIZE); + descriptors.add(CHARSET); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + // since properties were changed, clear any events that were queued + if (syslogEvents != null) { + syslogEvents.clear(); + } + if (errorEvents != null) { + errorEvents.clear(); + } + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + final int port = context.getProperty(PORT).asInteger(); + final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final String protocol = context.getProperty(PROTOCOL).getValue(); + final String charSet = context.getProperty(CHARSET).getValue(); + + parser = new SyslogParser(Charset.forName(charSet)); + bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); + syslogEvents = new LinkedBlockingQueue<>(10); + errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + + // create either a UDP or TCP reader and call open() to bind to the given port + channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents); + channelReader.open(port, maxChannelBufferSize); + + final Thread readerThread = new Thread(channelReader); + readerThread.setName("ListenSyslog [" + getIdentifier() + "]"); + readerThread.setDaemon(true); + readerThread.start(); + } + + // visible for testing to be overridden and provide a mock ChannelReader if desired + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents) + throws IOException { + if (protocol.equals(UDP_VALUE.getValue())) { + return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + } else { + return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + } + } + + // used for testing to access the random port that was selected + protected int getPort() { + return channelReader == null ? 0 : channelReader.getPort(); + } + + @OnUnscheduled + public void onUnscheduled() { + if (channelReader != null) { + channelReader.stop(); + channelReader.close(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // try to pull from the error queue first, if empty then pull from main queue + SyslogEvent initialEvent = errorEvents.poll(); + if (initialEvent == null) { + initialEvent = syslogEvents.poll(); + } + + // if nothing in either queue then just return + if (initialEvent == null) { + return; + } + + final SyslogEvent event = initialEvent; + + final Map attributes = new HashMap<>(); + attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority()); + attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity()); + attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility()); + attributes.put(SyslogAttributes.VERSION.key(), event.getVersion()); + attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp()); + attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName()); + attributes.put(SyslogAttributes.SENDER.key(), event.getSender()); + attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); + attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + + try { + // write the raw bytes of the message as the FlowFile content + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(event.getRawMessage()); + } + }); + + if (event.isValid()) { + getLogger().info("Transferring {} to success", new Object[]{flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().info("Transferring {} to invalid", new Object[]{flowFile}); + session.transfer(flowFile, REL_INVALID); + } + + } catch (ProcessException e) { + getLogger().error("Error processing Syslog message", e); + errorEvents.offer(event); + session.remove(flowFile); + } + } + + /** + * Reads messages from a channel until told to stop. + */ + public interface ChannelReader extends Runnable { + + void open(int port, int maxBufferSize) throws IOException; + + int getPort(); + + void stop(); + + void close(); + } + + /** + * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for + * processing, otherwise the buffer is returned to the buffer pool. + */ + public static class DatagramChannelReader implements ChannelReader { + + private final BufferPool bufferPool; + private final SyslogParser syslogParser; + private final BlockingQueue syslogEvents; + private final ProcessorLog logger; + private DatagramChannel datagramChannel; + private volatile boolean stopped = false; + + public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents, + final ProcessorLog logger) { + this.bufferPool = bufferPool; + this.syslogParser = syslogParser; + this.syslogEvents = syslogEvents; + this.logger = logger; + } + + @Override + public void open(final int port, int maxBufferSize) throws IOException { + datagramChannel = DatagramChannel.open(); + datagramChannel.configureBlocking(false); + if (maxBufferSize > 0) { + datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize); + } + } + datagramChannel.socket().bind(new InetSocketAddress(port)); + } + + @Override + public void run() { + while (!stopped) { + final ByteBuffer buffer = bufferPool.poll(); + try { + if (buffer == null) { + Thread.sleep(10L); + logger.debug("no available buffers, continuing..."); + continue; + } + + final SocketAddress sender = datagramChannel.receive(buffer); + if (sender == null) { + Thread.sleep(1000L); // nothing to do so wait... + } else { + final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender? + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + } + } catch (InterruptedException e) { + stop(); + } catch (IOException e) { + logger.error("Error reading from DatagramChannel", e); + } finally { + if (buffer != null) { + bufferPool.returnBuffer(buffer, 0); + } + } + } + } + + @Override + public int getPort() { + return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public void close() { + IOUtils.closeQuietly(datagramChannel); + } + } + + /** + * Accepts Socket connections on the given port and creates a handler for each connection to + * be executed by a thread pool. + */ + public static class SocketChannelReader implements ChannelReader { + + private final BufferPool bufferPool; + private final SyslogParser syslogParser; + private final BlockingQueue syslogEvents; + private final ProcessorLog logger; + private ServerSocketChannel serverSocketChannel; + private ExecutorService executor = Executors.newFixedThreadPool(2); + private volatile boolean stopped = false; + + public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue syslogEvents, + final ProcessorLog logger) { + this.bufferPool = bufferPool; + this.syslogParser = syslogParser; + this.syslogEvents = syslogEvents; + this.logger = logger; + } + + @Override + public void open(final int port, int maxBufferSize) throws IOException { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + if (maxBufferSize > 0) { + serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize); + } + } + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + } + + @Override + public void run() { + while (!stopped) { + try { + final SocketChannel socketChannel = serverSocketChannel.accept(); + if (socketChannel == null) { + Thread.sleep(1000L); // wait for an incoming connection... + } else { + final SocketChannelHandler handler = new SocketChannelHandler( + bufferPool, socketChannel, syslogParser, syslogEvents, logger); + logger.debug("Accepted incoming connection"); + executor.submit(handler); + } + } catch (IOException e) { + logger.error("Error accepting connection from SocketChannel", e); + } catch (InterruptedException e) { + stop(); + } + } + } + + @Override + public int getPort() { + return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort(); + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public void close() { + IOUtils.closeQuietly(serverSocketChannel); + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + } + + /** + * Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for + * processing, otherwise the buffer is returned to the buffer pool. + */ + public static class SocketChannelHandler implements Runnable { + + private final BufferPool bufferPool; + private final SocketChannel socketChannel; + private final SyslogParser syslogParser; + private final BlockingQueue syslogEvents; + private final ProcessorLog logger; + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser, + final BlockingQueue syslogEvents, final ProcessorLog logger) { + this.bufferPool = bufferPool; + this.socketChannel = socketChannel; + this.syslogParser = syslogParser; + this.syslogEvents = syslogEvents; + this.logger = logger; + } + + @Override + public void run() { + try { + int bytesRead = 0; + while (bytesRead >= 0 && !Thread.interrupted()) { + + final ByteBuffer buffer = bufferPool.poll(); + if (buffer == null) { + Thread.sleep(10L); + logger.debug("no available buffers, continuing..."); + continue; + } + + try { + // read until the buffer is full + bytesRead = socketChannel.read(buffer); + while (bytesRead > 0) { + bytesRead = socketChannel.read(buffer); + } + buffer.flip(); + + // go through the buffer looking for the end of each message + int bufferLength = buffer.limit(); + for (int i = 0; i < bufferLength; i++) { + byte currByte = buffer.get(i); + currBytes.write(currByte); + + // at the end of a message so parse an event, reset the buffer, and break out of the loop + if (currByte == '\n') { + final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), + socketChannel.socket().getInetAddress().toString()); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + currBytes.reset(); + } + } + } finally { + bufferPool.returnBuffer(buffer, 0); + } + } + + logger.debug("done handling SocketChannel"); + } catch (ClosedByInterruptException | InterruptedException e) { + // nothing to do here + } catch (IOException e) { + logger.error("Error reading from channel", e); + } finally { + IOUtils.closeQuietly(socketChannel); + } + } + + } + + static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) { + logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java new file mode 100644 index 0000000000..502b26f3ce --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.util.ObjectHolder; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@TriggerWhenEmpty +@Tags({"syslog", "put", "udp", "tcp", "logs"}) +@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " + + "which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: " + + "()(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " + + "RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " + + "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " + + "above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " + + "failures routed to the failure relationship.") +public class PutSyslog extends AbstractSyslogProcessor { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the Syslog server.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .build(); + public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Send Buffer Size") + .description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " + + "Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("2048 B") + .required(true) + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor + .Builder().name("Batch Size") + .description("The number of incoming FlowFiles to process in a single execution of this processor.") + .required(true) + .defaultValue("25") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor + .Builder().name("Idle Connection Expiration") + .description("The amount of time a connection should be held open without being used before closing the connection.") + .required(true) + .defaultValue("5 seconds") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor + .Builder().name("Message Priority") + .description("The priority for the Syslog messages, excluding < >.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor + .Builder().name("Message Version") + .description("The version for the Syslog messages.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor + .Builder().name("Message Timestamp") + .description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of " + + "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " + + "with a format of \"MMM d HH:mm:ss\".") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor + .Builder().name("Message Hostname") + .description("The hostname for the Syslog messages.") + .required(true) + .defaultValue("${hostname(true)}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor + .Builder().name("Message Body") + .description("The body for the Syslog messages.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to Syslog are sent out this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to Syslog are sent out this relationship.") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that do not form a valid Syslog message are sent out this relationship.") + .build(); + + private Set relationships; + private List descriptors; + + private volatile BlockingQueue bufferPool; + private volatile BlockingQueue senderPool; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PROTOCOL); + descriptors.add(PORT); + descriptors.add(IDLE_EXPIRATION); + descriptors.add(SEND_BUFFER_SIZE); + descriptors.add(BATCH_SIZE); + descriptors.add(CHARSET); + descriptors.add(MSG_PRIORITY); + descriptors.add(MSG_VERSION); + descriptors.add(MSG_TIMESTAMP); + descriptors.add(MSG_HOSTNAME); + descriptors.add(MSG_BODY); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + for (int i=0; i < context.getMaxConcurrentTasks(); i++) { + this.bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + + // create a pool of senders based on the number of concurrent tasks for this processor + this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + for (int i=0; i < context.getMaxConcurrentTasks(); i++) { + senderPool.offer(createSender(context, bufferPool)); + } + } + + protected ChannelSender createSender(final ProcessContext context, final BlockingQueue bufferPool) throws IOException { + final int port = context.getProperty(PORT).asInteger(); + final String host = context.getProperty(HOSTNAME).getValue(); + final String protocol = context.getProperty(PROTOCOL).getValue(); + final String charSet = context.getProperty(CHARSET).getValue(); + return createSender(protocol, host, port, Charset.forName(charSet), bufferPool); + } + + // visible for testing to override and provide a mock sender if desired + protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue bufferPool) + throws IOException { + if (protocol.equals(UDP_VALUE.getValue())) { + return new DatagramChannelSender(host, port, bufferPool, charset); + } else { + return new SocketChannelSender(host, port, bufferPool, charset); + } + } + + @OnStopped + public void onStopped() { + ChannelSender sender = senderPool.poll(); + while (sender != null) { + sender.close(); + sender = senderPool.poll(); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final String protocol = context.getProperty(PROTOCOL).getValue(); + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + final List flowFiles = session.get(batchSize); + if (flowFiles == null || flowFiles.isEmpty()) { + final List putBack = new ArrayList<>(); + final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + // if a connection hasn't been used with in the threshold then it gets closed + ChannelSender sender; + while ((sender = senderPool.poll()) != null) { + if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) { + getLogger().debug("Closing idle connection..."); + sender.close(); + } else { + putBack.add(sender); + } + } + + // re-queue senders that weren't idle, but if the queue is full then close the sender + for (ChannelSender putBackSender : putBack) { + boolean returned = senderPool.offer(putBackSender); + if (!returned) { + putBackSender.close(); + } + } + return; + } + + // get a sender from the pool, or create a new one if the pool is empty + // if we can't create a new connection then route flow files to failure and yield + ChannelSender sender = senderPool.poll(); + if (sender == null) { + try { + getLogger().debug("No available connections, creating a new one..."); + sender = createSender(context, bufferPool); + } catch (IOException e) { + for (final FlowFile flowFile : flowFiles) { + getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", + new Object[]{flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + return; + } + } + + final ObjectHolder exceptionHolder = new ObjectHolder<>(null); + try { + for (FlowFile flowFile : flowFiles) { + final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(); + final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue(); + final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); + final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue(); + + final StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("<").append(priority).append(">"); + if (version != null) { + messageBuilder.append(version).append(" "); + } + messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body); + + final String fullMessage = messageBuilder.toString(); + getLogger().debug(fullMessage); + + if (isValid(fullMessage)) { + try { + // now that we validated, add a new line if doing TCP + if (protocol.equals(TCP_VALUE.getValue())) { + messageBuilder.append('\n'); + } + + sender.send(messageBuilder.toString()); + getLogger().info("Transferring {} to success", new Object[]{flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } catch (IOException e) { + getLogger().error("Transferring {} to failure", new Object[]{flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + exceptionHolder.set(e); + } + } else { + getLogger().info("Transferring {} to invalid", new Object[]{flowFile}); + session.transfer(flowFile, REL_INVALID); + } + } + } finally { + // if the connection is still open and no IO errors happened then try to return, if pool is full then close + if (sender.isConnected() && exceptionHolder.get() == null) { + boolean returned = senderPool.offer(sender); + if (!returned) { + sender.close(); + } + } else { + // probably already closed here, but quietly close anyway to be safe + sender.close(); + } + + } + + } + + private boolean isValid(final String message) { + for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) { + Matcher matcher = pattern.matcher(message); + if (matcher.matches()) { + return true; + } + } + return false; + } + + /** + * Base class for sending messages over a channel. + */ + public static abstract class ChannelSender { + + final int port; + final String host; + final BlockingQueue bufferPool; + final Charset charset; + volatile long lastUsed; + + ChannelSender(final String host, final int port, final BlockingQueue bufferPool, final Charset charset) throws IOException { + this.port = port; + this.host = host; + this.bufferPool = bufferPool; + this.charset = charset; + } + + public void send(final String message) throws IOException { + final byte[] bytes = message.getBytes(charset); + + boolean shouldReturn = true; + ByteBuffer buffer = bufferPool.poll(); + if (buffer == null) { + buffer = ByteBuffer.allocate(bytes.length); + shouldReturn = false; + } else if (buffer.limit() < bytes.length) { + // we need a large buffer so return the one we got and create a new bigger one + bufferPool.offer(buffer); + buffer = ByteBuffer.allocate(bytes.length); + shouldReturn = false; + } + + try { + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + write(buffer); + lastUsed = System.currentTimeMillis(); + } finally { + if (shouldReturn) { + bufferPool.offer(buffer); + } + } + } + + // write the given buffer to the underlying channel + abstract void write(ByteBuffer buffer) throws IOException; + + // returns true if the underlying channel is connected + abstract boolean isConnected(); + + // close the underlying channel + abstract void close(); + } + + /** + * Sends messages over a DatagramChannel. + */ + static class DatagramChannelSender extends ChannelSender { + + final DatagramChannel channel; + + DatagramChannelSender(final String host, final int port, final BlockingQueue bufferPool, final Charset charset) throws IOException { + super(host, port, bufferPool, charset); + this.channel = DatagramChannel.open(); + this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); + } + + @Override + public void write(ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } + + @Override + boolean isConnected() { + return channel != null && channel.isConnected(); + } + + @Override + public void close() { + IOUtils.closeQuietly(channel); + } + } + + /** + * Sends messages over a SocketChannel. + */ + static class SocketChannelSender extends ChannelSender { + + final SocketChannel channel; + + SocketChannelSender(final String host, final int port, final BlockingQueue bufferPool, final Charset charset) throws IOException { + super(host, port, bufferPool, charset); + this.channel = SocketChannel.open(); + this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); + } + + @Override + public void write(ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } + + @Override + boolean isConnected() { + return channel != null && channel.isConnected(); + } + + @Override + public void close() { + IOUtils.closeQuietly(channel); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java new file mode 100644 index 0000000000..3d06dbee42 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +/** + * Encapsulates the parsed information for a single Syslog event. + */ +public class SyslogEvent { + + private final String priority; + private final String severity; + private final String facility; + private final String version; + private final String timeStamp; + private final String hostName; + private final String sender; + private final String msgBody; + private final String fullMessage; + private final byte[] rawMessage; + private final boolean valid; + + private SyslogEvent(final Builder builder) { + this.priority = builder.priority; + this.severity = builder.severity; + this.facility = builder.facility; + this.version = builder.version; + this.timeStamp = builder.timeStamp; + this.hostName = builder.hostName; + this.sender = builder.sender; + this.msgBody = builder.msgBody; + this.fullMessage = builder.fullMessage; + this.rawMessage = builder.rawMessage; + this.valid = builder.valid; + } + + public String getPriority() { + return priority; + } + + public String getSeverity() { + return severity; + } + + public String getFacility() { + return facility; + } + + public String getVersion() { + return version; + } + + public String getTimeStamp() { + return timeStamp; + } + + public String getHostName() { + return hostName; + } + + public String getSender() { + return sender; + } + + public String getMsgBody() { + return msgBody; + } + + public String getFullMessage() { + return fullMessage; + } + + public byte[] getRawMessage() { + return rawMessage; + } + + public boolean isValid() { + return valid; + } + + public static final class Builder { + private String priority; + private String severity; + private String facility; + private String version; + private String timeStamp; + private String hostName; + private String sender; + private String msgBody; + private String fullMessage; + private byte[] rawMessage; + private boolean valid; + + public void reset() { + this.priority = null; + this.severity = null; + this.facility = null; + this.version = null; + this.timeStamp = null; + this.hostName = null; + this.sender = null; + this.msgBody = null; + this.fullMessage = null; + this.valid = false; + } + + public Builder priority(String priority) { + this.priority = priority; + return this; + } + + public Builder severity(String severity) { + this.severity = severity; + return this; + } + + public Builder facility(String facility) { + this.facility = facility; + return this; + } + + public Builder version(String version) { + this.version = version; + return this; + } + + public Builder timestamp(String timestamp) { + this.timeStamp = timestamp; + return this; + } + + public Builder hostname(String hostName) { + this.hostName = hostName; + return this; + } + + public Builder sender(String sender) { + this.sender = sender; + return this; + } + + public Builder msgBody(String msgBody) { + this.msgBody = msgBody; + return this; + } + + public Builder fullMessage(String fullMessage) { + this.fullMessage = fullMessage; + return this; + } + + public Builder rawMessage(byte[] rawMessage) { + this.rawMessage = rawMessage; + return this; + } + + public Builder valid(boolean valid) { + this.valid = valid; + return this; + } + + public SyslogEvent build() { + return new SyslogEvent(this); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java new file mode 100644 index 0000000000..fd59e5b212 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.regex.MatchResult; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance. + * + * The Syslog regular expressions below were adapted from the Apache Flume project. + */ +public class SyslogParser { + + public static final String SYSLOG_MSG_RFC5424_0 = + "(?:\\<(\\d{1,3})\\>)" + // priority + "(?:(\\d)?\\s?)" + // version + /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ + "(?:" + + "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + + "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp + "\\s" + // separator + "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null) + "\\s" + // separator + "(.*)$"; // body + + public static final String SYSLOG_MSG_RFC3164_0 = + "(?:\\<(\\d{1,3})\\>)" + + "(?:(\\d)?\\s?)" + // version + // stamp MMM d HH:mm:ss, single digit date has two spaces + "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + + "\\s" + // separator + "([\\w][\\w\\d\\.@-]*)" + // host + "\\s(.*)$"; // body + + public static final Collection MESSAGE_PATTERNS; + static { + List patterns = new ArrayList<>(); + patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0)); + patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0)); + MESSAGE_PATTERNS = Collections.unmodifiableList(patterns); + } + + // capture group positions from the above message patterns + public static final int SYSLOG_PRIORITY_POS = 1; + public static final int SYSLOG_VERSION_POS = 2; + public static final int SYSLOG_TIMESTAMP_POS = 3; + public static final int SYSLOG_HOSTNAME_POS = 4; + public static final int SYSLOG_BODY_POS = 5; + + private Charset charset; + + public SyslogParser(final Charset charset) { + this.charset = charset; + } + + /** + * Parses a SyslogEvent from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final ByteBuffer buffer) { + return parseEvent(buffer, null); + } + + /** + * Parses a SyslogEvent from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final ByteBuffer buffer, final String sender) { + if (buffer == null) { + return null; + } + if (buffer.position() != 0) { + buffer.flip(); + } + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + return parseEvent(bytes, sender); + } + + /** + * Parses a SyslogEvent from a byte array. + * + * @param bytes a byte array containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final byte[] bytes, final String sender) { + if (bytes == null || bytes.length == 0) { + return null; + } + + // remove trailing new line before parsing + int length = bytes.length; + if (bytes[length - 1] == '\n') { + length = length - 1; + } + + final String message = new String(bytes, 0, length, charset); + + final SyslogEvent.Builder builder = new SyslogEvent.Builder() + .valid(false).fullMessage(message).rawMessage(bytes).sender(sender); + + for (Pattern pattern : MESSAGE_PATTERNS) { + final Matcher matcher = pattern.matcher(message); + if (!matcher.matches()) { + continue; + } + + final MatchResult res = matcher.toMatchResult(); + for (int grp = 1; grp <= res.groupCount(); grp++) { + String value = res.group(grp); + if (grp == SYSLOG_TIMESTAMP_POS) { + builder.timestamp(value); + } else if (grp == SYSLOG_HOSTNAME_POS) { + builder.hostname(value); + } else if (grp == SYSLOG_PRIORITY_POS) { + int pri = Integer.parseInt(value); + int sev = pri % 8; + int facility = pri / 8; + builder.priority(value); + builder.severity(String.valueOf(sev)); + builder.facility(String.valueOf(facility)); + } else if (grp == SYSLOG_VERSION_POS) { + builder.version(value); + } else if (grp == SYSLOG_BODY_POS) { + builder.msgBody(value); + } + } + + builder.valid(true); + break; + } + + // either invalid w/original msg, or fully parsed event + return builder.build(); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8507c96efa..60379edba7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -44,6 +44,7 @@ org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.ListenHTTP +org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP org.apache.nifi.processors.standard.LogAttribute @@ -58,6 +59,7 @@ org.apache.nifi.processors.standard.PutFTP org.apache.nifi.processors.standard.PutJMS org.apache.nifi.processors.standard.PutSFTP org.apache.nifi.processors.standard.PutSQL +org.apache.nifi.processors.standard.PutSyslog org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.ReplaceTextWithMapping org.apache.nifi.processors.standard.RouteOnAttribute diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java new file mode 100644 index 0000000000..0e0d972b8c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public class TestListenSyslog { + + static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class); + + static final String PRI = "34"; + static final String SEV = "2"; + static final String FAC = "4"; + static final String TIME = "Oct 13 15:43:23"; + static final String HOST = "localhost.home"; + static final String BODY = "some message"; + + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; + static final String INVALID_MESSAGE = "this is not valid\n"; + + @Test + public void testUDP() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some UDP messages to the port in the background + final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all datagrams, or 30 seconds passed + try { + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPSingleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile); + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPMultipleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile); + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testInvalid() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 10; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(50); + proc.onTrigger(context, processSessionFactory); + numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testErrorQueue() { + final SyslogEvent event1 = Mockito.mock(SyslogEvent.class); + Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR")); + + final SyslogEvent event2 = new SyslogEvent.Builder() + .facility("fac").severity("sev") + .fullMessage("abc").hostname("host") + .msgBody("body").timestamp("123").valid(true) + .rawMessage("abc".getBytes(Charset.forName("UTF-8"))) + .build(); + + final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2)); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PORT, "12345"); + + // should keep re-processing event1 from the error queue + runner.run(3); + runner.assertTransferCount(ListenSyslog.REL_INVALID, 0); + runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0); + } + + + private void checkFlowFile(MockFlowFile flowFile) { + flowFile.assertContentEquals(VALID_MESSAGE); + Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(FAC, flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key())); + Assert.assertEquals(TIME, flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key())); + Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key())); + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class DatagramSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public DatagramSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + try (DatagramChannel channel = DatagramChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + for (int i=0; i < numMessages; i++) { + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while(buffer.hasRemaining()) { + channel.write(buffer); + } + + Thread.sleep(delay); + } + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class SingleConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + for (int i=0; i < numMessages; i++) { + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class MultiConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + for (int i=0; i < numMessages; i++) { + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + } + + // A mock version of ListenSyslog that will queue the provided events + private static class MockProcessor extends ListenSyslog { + + private List eventList; + + public MockProcessor(List eventList) { + this.eventList = eventList; + } + + @Override + protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue syslogEvents) throws IOException { + return new ChannelReader() { + @Override + public void open(int port, int maxBufferSize) throws IOException { + + } + + @Override + public int getPort() { + return 0; + } + + @Override + public void stop() { + + } + + @Override + public void close() { + + } + + @Override + public void run() { + for (SyslogEvent event : eventList) { + syslogEvents.offer(event); + } + } + }; + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java new file mode 100644 index 0000000000..40a91231b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPutSyslog { + + private MockCollectingSender sender; + private MockPutSyslog proc; + private TestRunner runner; + + @Before + public void setup() throws IOException { + sender = new MockCollectingSender(); + proc = new MockPutSyslog(sender); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + } + + @Test + public void testValidMessageStaticPropertiesUdp() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testValidMessageStaticPropertiesTcp() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", "")); + } + + @Test + public void testValidMessageStaticPropertiesNoVersion() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testValidMessageELProperties() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testInvalidMessageELProperties() { + final String pri = "34"; + final String stamp = "not-a-timestamp"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testIOExceptionOnSend() throws IOException { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + proc = new MockPutSyslog(new MockErrorSender()); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testIOExceptionCreatingConnection() throws IOException { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + runner.setProperty(PutSyslog.BATCH_SIZE, "1"); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + // the first run will throw IOException when calling send so the connection won't be re-qeued + // the second run will try to create a new connection but throw an exception which should be caught and route files to failure + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(2); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testLargeMessageFailure() { + final String pri = "34"; + final String stamp = "2015-10-15T22:14:15.003Z"; + final String host = "mymachine.example.com"; + + final StringBuilder bodyBuilder = new StringBuilder(4096); + for (int i=0; i < 4096; i++) { + bodyBuilder.append("a"); + } + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", bodyBuilder.toString()); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + // should have dynamically created a larger buffer + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + } + + @Test + public void testNoIncomingData() { + runner.setProperty(PutSyslog.MSG_PRIORITY, "10"); + runner.setProperty(PutSyslog.MSG_VERSION, "1"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.MSG_BODY, "test"); + + // queue one file but run several times to test no incoming data + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(5); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + } + + @Test + public void testBatchingFlowFiles() { + runner.setProperty(PutSyslog.BATCH_SIZE, "10"); + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map attributes = new HashMap<>(); + attributes.put("syslog.priority", "10"); + attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z"); + attributes.put("syslog.hostname", "my.host.name"); + attributes.put("syslog.body", "blah blah blah"); + + for (int i=0; i < 15; i++) { + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + } + + runner.run(); + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10); + Assert.assertEquals(10, sender.messages.size()); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15); + Assert.assertEquals(15, sender.messages.size()); + } + + // Mock processor to return a MockCollectingSender + static class MockPutSyslog extends PutSyslog { + + ChannelSender mockSender; + + public MockPutSyslog(ChannelSender sender) { + this.mockSender = sender; + } + + @Override + protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue bufferPool) throws IOException { + return mockSender; + } + } + + // Mock processor to test exception when creating new senders + static class MockCreationErrorPutSyslog extends PutSyslog { + + int numSendersCreated; + int numSendersAllowed; + ChannelSender mockSender; + + public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) { + this.mockSender = sender; + this.numSendersAllowed = numSendersAllowed; + } + + @Override + protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue bufferPool) throws IOException { + if (numSendersCreated >= numSendersAllowed) { + throw new IOException("too many senders"); + } + numSendersCreated++; + return mockSender; + } + } + + // Mock sender that saves any messages passed to send() + static class MockCollectingSender extends PutSyslog.ChannelSender { + + List messages = new ArrayList<>(); + + public MockCollectingSender() throws IOException { + super("myhost", 0, new LinkedBlockingQueue(1), Charset.forName("UTF-8")); + this.bufferPool.offer(ByteBuffer.allocate(1024)); + } + + @Override + public void send(String message) throws IOException { + messages.add(message); + super.send(message); + } + + @Override + void write(ByteBuffer buffer) throws IOException { + + } + + @Override + boolean isConnected() { + return true; + } + + @Override + void close() { + + } + } + + // Mock sender that throws IOException on calls to write() or send() + static class MockErrorSender extends PutSyslog.ChannelSender { + + public MockErrorSender() throws IOException { + super(null, 0, null, null); + } + + @Override + public void send(String message) throws IOException { + throw new IOException("error"); + } + + @Override + void write(ByteBuffer buffer) throws IOException { + throw new IOException("error"); + } + + @Override + boolean isConnected() { + return false; + } + + @Override + void close() { + + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java new file mode 100644 index 0000000000..d1b1bbf598 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class TestSyslogParser { + + static final Charset CHARSET = Charset.forName("UTF-8"); + + private SyslogParser parser; + + @Before + public void setup() { + parser = new SyslogParser(CHARSET); + } + + @Test + public void testRFC3164SingleDigitDay() { + final String pri = "10"; + final String stamp = "Oct 1 13:14:04"; + final String host = "my.host.com"; + final String body = "some body message"; + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("1", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC3164DoubleDigitDay() { + final String pri = "31"; + final String stamp = "Oct 13 14:14:43"; + final String host = "localhost"; + final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000"; + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("7", event.getSeverity()); + Assert.assertEquals("3", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC3164WithVersion() { + final String pri = "31"; + final String version = "1"; + final String stamp = "Oct 13 14:14:43"; + final String host = "localhost"; + final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000"; + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("7", event.getSeverity()); + Assert.assertEquals("3", event.getFacility()); + Assert.assertEquals(version, event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC5424WithVersion() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("4", event.getFacility()); + Assert.assertEquals(version, event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC5424WithoutVersion() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("4", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testTrailingNewLine() { + final String message = "<31>Oct 13 15:43:23 localhost.home some message\n"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testVariety() { + final List messages = new ArrayList<>(); + + // supported examples from RFC 3164 + messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " + + "lonvick on /dev/pts/8"); + messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!"); + messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " + + "It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # " + + "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " + + "Conveyer1=OK, Conveyer2=OK # %%"); + messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " + + "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!"); + + // supported examples from RFC 5424 + messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8"); + messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " + + "8710 - - %% It's time to make the do-nuts."); + + // non-standard (but common) messages (RFC3339 dates, no version digit) + messages.add("<13>2003-08-24T05:14:15Z localhost snarf?"); + messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!"); + + for (final String message : messages) { + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertTrue(event.isValid()); + } + } + + @Test + public void testInvalidPriority() { + final String message = "10 Oct 13 14:14:43 localhost some body of the message"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertFalse(event.isValid()); + Assert.assertEquals(message, event.getFullMessage()); + } + + @Test + public void testParseWithSender() { + final String sender = "127.0.0.1"; + final String message = "<31>Oct 13 15:43:23 localhost.home some message\n"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer, sender); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + Assert.assertEquals(sender, event.getSender()); + } +} diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 841818aeaf..d48b755422 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -46,7 +46,7 @@ nifi-image-bundle nifi-avro-bundle nifi-couchbase-bundle - + @@ -131,4 +131,4 @@ - + \ No newline at end of file