NIFI-1221: Support batching of Syslog messages

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2015-11-25 17:21:00 -05:00 committed by Bryan Bende
parent c59087bc3a
commit e5281f1fc1
6 changed files with 676 additions and 176 deletions

View File

@ -31,6 +31,7 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -47,16 +48,19 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -70,6 +74,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@ -93,6 +98,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
@WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."),
@WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
@SeeAlso({PutSyslog.class, ParseSyslog.class})
public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
@ -120,6 +126,31 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("2")
.required(true)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description(
"The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.required(true)
.build();
public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder()
.name("Parse Messages")
.description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only " +
"contain the sender, protocol, and port, and no additional attributes.")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -133,11 +164,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile BufferPool bufferPool;
private volatile ChannelReader channelReader;
private volatile SyslogParser parser;
private volatile BlockingQueue<SyslogEvent> syslogEvents;
private volatile BlockingQueue<SyslogEvent> errorEvents;
private volatile BlockingQueue<ByteBuffer> bufferPool;
private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10);
private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents.
@Override
protected void init(final ProcessorInitializationContext context) {
@ -147,6 +179,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(MAX_CONNECTIONS);
descriptors.add(MAX_BATCH_SIZE);
descriptors.add(MESSAGE_DELIMITER);
descriptors.add(PARSE_MESSAGES);
descriptors.add(CHARSET);
this.descriptors = Collections.unmodifiableList(descriptors);
@ -162,13 +197,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
// since properties were changed, clear any events that were queued
// if we are changing the protocol, the events that we may have queued up are no longer valid, as they
// were received using a different protocol and may be from a completely different source
if (PROTOCOL.equals(descriptor)) {
if (syslogEvents != null) {
syslogEvents.clear();
}
@ -176,6 +213,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
errorEvents.clear();
}
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
if (validationContext.getProperty(MAX_BATCH_SIZE).asInteger() > 1 && validationContext.getProperty(PARSE_MESSAGES).asBoolean()) {
results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false)
.explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build());
}
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
@ -184,21 +232,26 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
final int maxConnections;
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
final String charsetName = context.getProperty(CHARSET).getValue();
messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charsetName));
final int maxConnections;
if (protocol.equals(UDP_VALUE.getValue())) {
maxConnections = 1;
} else {
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
bufferPool = new LinkedBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
bufferPool.offer(ByteBuffer.allocate(bufferSize));
}
parser = new SyslogParser(Charset.forName(charSet));
bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@ -207,13 +260,19 @@ public class ListenSyslog extends AbstractSyslogProcessor {
readerThread.start();
}
// visible for testing.
protected SyslogParser getParser() {
return parser;
}
// visible for testing to be overridden and provide a mock ChannelReader if desired
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
protected ChannelReader createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
int maxConnections)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
return new DatagramChannelReader(bufferPool, syslogEvents, getLogger());
} else {
return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
return new SocketChannelReader(bufferPool, syslogEvents, getLogger(), maxConnections);
}
}
@ -230,66 +289,179 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// try to pull from the error queue first, if empty then pull from main queue
SyslogEvent initialEvent = errorEvents.poll();
if (initialEvent == null) {
initialEvent = syslogEvents.poll();
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) {
RawSyslogEvent rawSyslogEvent = null;
if (pollErrorQueue) {
rawSyslogEvent = errorEvents.poll();
}
// if nothing in either queue then yield and return
if (initialEvent == null) {
if (rawSyslogEvent == null) {
try {
if (longPoll) {
rawSyslogEvent = syslogEvents.poll(100, TimeUnit.MILLISECONDS);
} else {
rawSyslogEvent = syslogEvents.poll();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return rawSyslogEvent;
}
protected int getErrorQueueSize() {
return errorEvents.size();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// poll the queue with a small timeout to avoid unnecessarily yielding below
RawSyslogEvent rawSyslogEvent = getMessage(true, true);
// if nothing in the queue then yield and return
if (rawSyslogEvent == null) {
context.yield();
return;
}
final SyslogEvent event = initialEvent;
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final String port = context.getProperty(PORT).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String,String> attributes = new HashMap<>();
final Map<String, String> defaultAttributes = new HashMap<>(4);
defaultAttributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
defaultAttributes.put(SyslogAttributes.PORT.key(), port);
defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
final int numAttributes = SyslogAttributes.values().length + 2;
final boolean shouldParse = context.getProperty(PARSE_MESSAGES).asBoolean();
final Map<String, FlowFile> flowFilePerSender = new HashMap<>();
final SyslogParser parser = getParser();
for (int i = 0; i < maxBatchSize; i++) {
SyslogEvent event = null;
// If this is our first iteration, we have already polled our queues. Otherwise, poll on each iteration.
if (i > 0) {
rawSyslogEvent = getMessage(false, false);
if (rawSyslogEvent == null) {
break;
}
}
final String sender = rawSyslogEvent.getSender();
FlowFile flowFile = flowFilePerSender.get(sender);
if (flowFile == null) {
flowFile = session.create();
flowFilePerSender.put(sender, flowFile);
}
if (shouldParse) {
boolean valid = true;
try {
event = parser.parseEvent(rawSyslogEvent.getRawMessage(), sender);
} catch (final ProcessException pe) {
getLogger().warn("Failed to parse Syslog event; routing to invalid");
valid = false;
}
// If the event is invalid, route it to 'invalid' and then stop.
// We create a separate FlowFile for this case instead of using 'flowFile',
// because the 'flowFile' object may already have data written to it.
if (!valid || !event.isValid()) {
FlowFile invalidFlowFile = session.create();
invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes);
if (sender != null) {
invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SENDER.key(), sender);
}
try {
final byte[] rawBytes = rawSyslogEvent.getRawMessage();
invalidFlowFile = session.write(invalidFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(rawBytes);
}
});
} catch (final Exception e) {
getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e);
errorEvents.offer(rawSyslogEvent);
session.remove(invalidFlowFile);
break;
}
session.transfer(invalidFlowFile, REL_INVALID);
break;
}
getLogger().trace(event.getFullMessage());
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
attributes.put(SyslogAttributes.PORT.key(), port);
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
}
final String transitUri = new StringBuilder().append(protocol).append("://").append(event.getSender())
.append(":").append(port).toString();
// figure out if we should write the bytes from the raw event or parsed event
final boolean writeDemarcator = (i > 0);
try {
// write the raw bytes of the message as the FlowFile content
flowFile = session.write(flowFile, new OutputStreamCallback() {
final byte[] rawMessage = (event == null) ? rawSyslogEvent.getRawMessage() : event.getRawMessage();
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(event.getRawMessage());
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
}
});
if (event.isValid()) {
getLogger().info("Transferring {} to success", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri);
} else {
getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
session.transfer(flowFile, REL_INVALID);
} catch (final Exception e) {
getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e);
errorEvents.offer(rawSyslogEvent);
break;
}
} catch (ProcessException e) {
getLogger().error("Error processing Syslog message", e);
errorEvents.offer(event);
session.adjustCounter("Messages Received", 1L, false);
flowFilePerSender.put(sender, flowFile);
}
for (final Map.Entry<String, FlowFile> entry : flowFilePerSender.entrySet()) {
final String sender = entry.getKey();
FlowFile flowFile = entry.getValue();
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
getLogger().debug("No data written to FlowFile from Sender {}; removing FlowFile", new Object[] {sender});
continue;
}
final Map<String, String> newAttributes = new HashMap<>(defaultAttributes.size() + 1);
newAttributes.putAll(defaultAttributes);
newAttributes.put(SyslogAttributes.SENDER.key(), sender);
flowFile = session.putAllAttributes(flowFile, newAttributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
session.getProvenanceReporter().receive(flowFile, transitUri);
}
}
@ -313,18 +485,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
*/
public static class DatagramChannelReader implements ChannelReader {
private final BufferPool bufferPool;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final BlockingQueue<ByteBuffer> bufferPool;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
private final ProcessorLog logger;
private DatagramChannel datagramChannel;
private volatile boolean stopped = false;
private Selector selector;
public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger) {
public DatagramChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
}
@ -360,17 +529,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
DatagramChannel channel = (DatagramChannel) key.channel();
SocketAddress sender;
SocketAddress socketAddress;
buffer.clear();
while (!stopped && (sender = channel.receive(buffer)) != null) {
final SyslogEvent event;
if (sender instanceof InetSocketAddress) {
event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
} else {
event = syslogParser.parseEvent(buffer);
while (!stopped && (socketAddress = channel.receive(buffer)) != null) {
String sender = "";
if (socketAddress instanceof InetSocketAddress) {
sender = ((InetSocketAddress) socketAddress).getAddress().toString();
}
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
// create a byte array from the buffer
buffer.flip();
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes, 0, buffer.limit());
// queue the raw message with the sender, block until space is available
syslogEvents.put(new RawSyslogEvent(bytes, sender));
buffer.clear();
}
}
}
@ -380,8 +554,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
logger.error("Error reading from DatagramChannel", e);
}
}
if (buffer != null) {
bufferPool.returnBuffer(buffer, 0);
try {
bufferPool.put(buffer);
} catch (InterruptedException e) {
// nothing to do here
}
}
}
@ -409,9 +588,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
*/
public static class SocketChannelReader implements ChannelReader {
private final BufferPool bufferPool;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final BlockingQueue<ByteBuffer> bufferPool;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ExecutorService executor;
private volatile boolean stopped = false;
@ -420,10 +598,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final int maxConnections;
private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger, final int maxConnections) {
public SocketChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger, final int maxConnections) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
this.maxConnections = maxConnections;
@ -486,8 +662,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Clear out the operations the select is interested in until done reading
key.interestOps(0);
// Create and execute the read handler
final SocketChannelHandler handler = new SocketChannelHandler(key, this,
syslogParser, syslogEvents, logger);
final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogEvents, logger);
// and launch the thread
executor.execute(handler);
}
@ -546,7 +721,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public void completeConnection(SelectionKey key) {
// connection is done. Return the buffer to the pool
bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
try {
bufferPool.put((ByteBuffer) key.attachment());
} catch (InterruptedException e) {
// nothing to do here
}
currentConnections.decrementAndGet();
}
@ -565,16 +744,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final SelectionKey key;
private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final BlockingQueue<RawSyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) {
this.key = key;
this.dispatcher = dispatcher;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
}
@ -609,11 +785,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// check if at end of a message
if (currByte == '\n') {
// parse an event, reset the buffer
final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
socketChannel.socket().getInetAddress().toString());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
String sender = socketChannel.socket().getInetAddress().toString();
// queue the raw event blocking until space is available, reset the buffer
syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender));
currBytes.reset();
// Mark this as the start of the next message
socketBuffer.mark();
@ -655,4 +829,25 @@ public class ListenSyslog extends AbstractSyslogProcessor {
+ "maximum receive buffer");
}
// Wrapper class to pass around the raw message and the host/ip that sent it
public static class RawSyslogEvent {
final byte[] rawMessage;
final String sender;
public RawSyslogEvent(byte[] rawMessage, String sender) {
this.rawMessage = rawMessage;
this.sender = sender;
}
public byte[] getRawMessage() {
return this.rawMessage;
}
public String getSender() {
return this.sender;
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.StreamUtils;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"logs", "syslog", "attributes", "system", "event", "message"})
@CapabilityDescription("Parses the contents of a Syslog message and adds attributes to the FlowFile for each of the parts of the Syslog message")
@WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description = "The priority of the Syslog message."),
@WritesAttribute(attribute = "syslog.severity", description = "The severity of the Syslog message derived from the priority."),
@WritesAttribute(attribute = "syslog.facility", description = "The facility of the Syslog message derived from the priority."),
@WritesAttribute(attribute = "syslog.version", description = "The optional version from the Syslog message."),
@WritesAttribute(attribute = "syslog.timestamp", description = "The timestamp of the Syslog message."),
@WritesAttribute(attribute = "syslog.hostname", description = "The hostname of the Syslog message."),
@WritesAttribute(attribute = "syslog.sender", description = "The hostname of the Syslog server that sent the message."),
@WritesAttribute(attribute = "syslog.body", description = "The body of the Syslog message, everything after the hostname.")})
@SeeAlso({ListenSyslog.class, PutSyslog.class})
public class ParseSyslog extends AbstractProcessor {
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies which character set of the Syslog messages")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that could not be parsed as a Syslog message will be transferred to this Relationship without any attributes being added")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully parsed as a Syslog message will be to this Relationship.")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(1);
properties.add(CHARSET);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_FAILURE);
relationships.add(REL_SUCCESS);
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String charsetName = context.getProperty(CHARSET).getValue();
final SyslogParser parser = new SyslogParser(Charset.forName(charsetName));
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
final SyslogEvent event;
try {
event = parser.parseEvent(buffer, null);
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as a Syslog message due to {}; routing to failure", new Object[] {flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
if (!event.isValid()) {
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
final Map<String, String> attributes = new HashMap<>(8);
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -64,6 +65,7 @@ import java.util.regex.Pattern;
"or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
"above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " +
"failures routed to the failure relationship.")
@SeeAlso({ListenSyslog.class, ParseSyslog.class})
public class PutSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()

View File

@ -53,6 +53,7 @@ org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.MergeContent
org.apache.nifi.processors.standard.ModifyBytes
org.apache.nifi.processors.standard.MonitorActivity
org.apache.nifi.processors.standard.ParseSyslog
org.apache.nifi.processors.standard.PostHTTP
org.apache.nifi.processors.standard.PutEmail
org.apache.nifi.processors.standard.PutDistributedMapCache

View File

@ -16,23 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -40,9 +24,31 @@ import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.ListenSyslog.RawSyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestListenSyslog {
@ -100,8 +106,7 @@ public class TestListenSyslog {
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
} finally {
// unschedule to close connections
@ -151,8 +156,7 @@ public class TestListenSyslog {
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
@ -203,8 +207,7 @@ public class TestListenSyslog {
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
} finally {
// unschedule to close connections
@ -212,6 +215,57 @@ public class TestListenSyslog {
}
}
@Test
public void testBatching() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "25");
runner.setProperty(ListenSyslog.MESSAGE_DELIMITER, "|");
runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
final int numMessages = 20;
final int port = proc.getPort();
Assert.assertTrue(port > 0);
// write some UDP messages to the port in the background
final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE.replaceAll("\\n", "")));
sender.setDaemon(true);
sender.start();
sender.join();
try {
proc.onTrigger(context, processSessionFactory);
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
Assert.assertEquals("0", flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
final String[] splits = content.split("\\|");
Assert.assertEquals(20, splits.length);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
} finally {
// unschedule to close connections
proc.onUnscheduled();
}
}
@Test
public void testInvalid() throws IOException, InterruptedException {
final ListenSyslog proc = new ListenSyslog();
@ -254,25 +308,69 @@ public class TestListenSyslog {
}
@Test
public void testErrorQueue() {
final SyslogEvent event1 = Mockito.mock(SyslogEvent.class);
Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR"));
final SyslogEvent event2 = new SyslogEvent.Builder()
.facility("fac").severity("sev")
.fullMessage("abc").hostname("host")
.msgBody("body").timestamp("123").valid(true)
.rawMessage("abc".getBytes(Charset.forName("UTF-8")))
.build();
final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2));
public void testParsingError() throws IOException {
final FailParseProcessor proc = new FailParseProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PORT, "12345");
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
try {
final int port = proc.getPort();
final DatagramSender sender = new DatagramSender(port, 1, 1, INVALID_MESSAGE);
sender.run();
// should keep re-processing event1 from the error queue
runner.run(3);
runner.assertTransferCount(ListenSyslog.REL_INVALID, 0);
proc.onTrigger(context, processSessionFactory);
runner.assertTransferCount(ListenSyslog.REL_INVALID, 1);
runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
} finally {
proc.onUnscheduled();
}
}
@Test
public void testErrorQueue() throws IOException {
final List<RawSyslogEvent> msgs = new ArrayList<>();
msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
// Add message that will throw a FlowFileAccessException the first time that we attempt to read
// the contents but will succeeed the second time.
final IntegerHolder getMessageAttempts = new IntegerHolder(0);
msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") {
@Override
public byte[] getRawMessage() {
final int attempts = getMessageAttempts.incrementAndGet();
if (attempts == 1) {
throw new FlowFileAccessException("Unit test failure");
} else {
return VALID_MESSAGE.getBytes();
}
}
});
final CannedMessageProcessor proc = new CannedMessageProcessor(msgs);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "5");
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
runner.setProperty(ListenSyslog.PORT, "0");
runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
runner.run();
assertEquals(1, proc.getErrorQueueSize());
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE + "\n" + VALID_MESSAGE);
// running again should pull from the error queue
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE);
}
@ -420,46 +518,39 @@ public class TestListenSyslog {
}
// A mock version of ListenSyslog that will queue the provided events
private static class MockProcessor extends ListenSyslog {
private List<SyslogEvent> eventList;
public MockProcessor(List<SyslogEvent> eventList) {
this.eventList = eventList;
}
private static class FailParseProcessor extends ListenSyslog {
@Override
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
return new ChannelReader() {
protected SyslogParser getParser() {
return new SyslogParser(StandardCharsets.UTF_8) {
@Override
public void open(int port, int maxBufferSize) throws IOException {
}
@Override
public int getPort() {
return 0;
}
@Override
public void stop() {
}
@Override
public void close() {
}
@Override
public void run() {
for (SyslogEvent event : eventList) {
syslogEvents.offer(event);
}
public SyslogEvent parseEvent(byte[] bytes, String sender) {
throw new ProcessException("Unit test intentionally failing");
}
};
}
}
private static class CannedMessageProcessor extends ListenSyslog {
private final Iterator<RawSyslogEvent> eventItr;
public CannedMessageProcessor(final List<RawSyslogEvent> events) {
this.eventItr = events.iterator();
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.remove(PORT);
properties.add(new PropertyDescriptor.Builder().name(PORT.getName()).addValidator(Validator.VALID).build());
return properties;
}
@Override
protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) {
if (eventItr.hasNext()) {
return eventItr.next();
}
return super.getMessage(longPoll, pollErrorQueue);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestParseSyslog {
static final String PRI = "34";
static final String SEV = "2";
static final String FAC = "4";
static final String TIME = "Oct 13 15:43:23";
static final String HOST = "localhost.home";
static final String BODY = "some message";
static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
@Test
public void testSuccessfulParse3164() {
final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
runner.enqueue(VALID_MESSAGE_RFC3164_0.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), HOST);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
}
@Test
public void testInvalidMessage() {
final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
runner.enqueue("<hello> yesterday localhost\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_FAILURE, 1);
}
}