NIFI-274 - Fixing TestListenSyslog, fixing default buffer size to be bytes, adding syslog.protocol to attributes

- Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections
         - Adding @InputRequirement to processors and adding appropriate send and receive provenance events
This commit is contained in:
Bryan Bende 2015-11-04 09:01:52 -05:00 committed by Tony Kurc
parent 5611dac3f8
commit 618f22e110
5 changed files with 104 additions and 17 deletions

View File

@ -64,7 +64,9 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
HOSTNAME("syslog.hostname"),
SENDER("syslog.sender"),
BODY("syslog.body"),
VALID("syslog.valid");
VALID("syslog.valid"),
PROTOCOL("syslog.protocol"),
PORT("syslog.pprt");
private String key;

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -68,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
"expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@ -88,6 +90,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
@WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
@WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
"If this value is false, the other attributes will be empty and only the original message will be available in the content."),
@WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."),
@WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."),
@WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
public class ListenSyslog extends AbstractSyslogProcessor {
@ -97,7 +101,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
"incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
"from an incoming connection until the buffer is full, or the connection is closed. ")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("65507 KB")
.defaultValue("65507 B")
.required(true)
.build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
@ -110,8 +114,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max number of TCP connections")
.description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
.name("Max Number of TCP Connections")
.description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
@ -142,8 +146,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
descriptors.add(MAX_CONNECTIONS);
descriptors.add(CHARSET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@ -184,7 +188,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (protocol.equals(UDP_VALUE.getValue())) {
maxConnections = 1;
} else{
} else {
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
@ -240,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
final SyslogEvent event = initialEvent;
final String port = context.getProperty(PORT).getValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String,String> attributes = new HashMap<>();
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
@ -251,11 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
attributes.put(SyslogAttributes.PORT.key(), port);
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
final String transitUri = new StringBuilder().append(protocol).append("://").append(event.getSender())
.append(":").append(port).toString();
try {
// write the raw bytes of the message as the FlowFile content
flowFile = session.write(flowFile, new OutputStreamCallback() {
@ -268,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (event.isValid()) {
getLogger().info("Transferring {} to success", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri);
} else {
getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
session.transfer(flowFile, REL_INVALID);
@ -454,7 +466,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Check for available connections
if (currentConnections.incrementAndGet() > maxConnections){
currentConnections.decrementAndGet();
logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
logger.warn("Rejecting connection from {} because max connections has been met",
new Object[]{ socketChannel.getRemoteAddress().toString() });
IOUtils.closeQuietly(socketChannel);
continue;
}
@ -494,8 +507,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public int getPort() {
// Return the port for the key listening for accepts
for(SelectionKey key : selector.keys()){
if (key.isValid() && key.isAcceptable()) {
return ((SocketChannel)key.channel()).socket().getLocalPort();
if (key.isValid()) {
final Channel channel = key.channel();
if (channel instanceof ServerSocketChannel) {
return ((ServerSocketChannel)channel).socket().getLocalPort();
}
}
}
return 0;
@ -619,7 +635,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
// Treat same as closed socket
// Treat same as closed socket
eof = true;
} finally {
if(eof == true) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.net.InetAddress;
@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerWhenEmpty
@Tags({"syslog", "put", "udp", "tcp", "logs"})
@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
@ -59,7 +62,7 @@ import java.util.regex.Pattern;
"(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " +
"RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
"or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
"above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " +
"above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " +
"failures routed to the failure relationship.")
public class PutSyslog extends AbstractSyslogProcessor {
@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
}
final String port = context.getProperty(PORT).getValue();
final String host = context.getProperty(HOSTNAME).getValue();
final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
try {
for (FlowFile flowFile : flowFiles) {
final StopWatch timer = new StopWatch(true);
final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
sender.send(messageBuilder.toString());
timer.stop();
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri, duration, true);
getLogger().info("Transferring {} to success", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {

View File

@ -16,12 +16,15 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -89,7 +92,16 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile);
checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
} finally {
// unschedule to close connections
@ -131,7 +143,17 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
} finally {
// unschedule to close connections
proc.onUnscheduled();
@ -143,6 +165,7 @@ public class TestListenSyslog {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
@ -172,7 +195,17 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
checkFlowFile(flowFile);
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(numMessages, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
event.getTransitUri());
} finally {
// unschedule to close connections
proc.onUnscheduled();
@ -210,6 +243,8 @@ public class TestListenSyslog {
proc.onTrigger(context, processSessionFactory);
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
}
// all messages should be transferred to invalid
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
} finally {
@ -241,7 +276,7 @@ public class TestListenSyslog {
}
private void checkFlowFile(MockFlowFile flowFile) {
private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE);
Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@ -250,6 +285,9 @@ public class TestListenSyslog {
Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
}
/**
@ -392,7 +430,7 @@ public class TestListenSyslog {
@Override
protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
return new ChannelReader() {
@Override
public void open(int port, int maxBufferSize) throws IOException {

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -70,6 +72,14 @@ public class TestPutSyslog {
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0));
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
}
@Test
@ -95,6 +105,14 @@ public class TestPutSyslog {
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
Assert.assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
}
@Test