NIFI-1939 - Correct issue where ParseSyslog was unable to parse RFC3164 messages containg an IPv6 address as source

NIFI-1939 - Fix typo and adjust ListenSyslog as per PR feedback

This closes #1639.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Andre F de Miranda 2017-04-01 01:57:50 +11:00 committed by Bryan Bende
parent de67e5f7d5
commit 9ad7802284
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
4 changed files with 44 additions and 4 deletions

View File

@ -91,7 +91,7 @@ import org.apache.nifi.ssl.SSLContextService;
@WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."), @WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."),
@WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."), @WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."),
@WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."), @WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."),
@WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."), @WritesAttribute(attribute="syslog.hostname", description="The hostname or IP address of the Syslog message."),
@WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."), @WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."),
@WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."), @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
@WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " + @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +

View File

@ -56,13 +56,17 @@ import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"logs", "syslog", "attributes", "system", "event", "message"}) @Tags({"logs", "syslog", "attributes", "system", "event", "message"})
@CapabilityDescription("Parses the contents of a Syslog message and adds attributes to the FlowFile for each of the parts of the Syslog message") @CapabilityDescription("Attempts to parses the contents of a Syslog message in accordance to RFC5424 and RFC3164 " +
"formats and adds attributes to the FlowFile for each of the parts of the Syslog message." +
"Note: Be mindfull that RFC3164 is informational and a wide range of different implementations are present in" +
" the wild. If messages fail parsing, considering using RFC5424 or using a generic parsing processors such as " +
"ExtractGrok.")
@WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description = "The priority of the Syslog message."), @WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description = "The priority of the Syslog message."),
@WritesAttribute(attribute = "syslog.severity", description = "The severity of the Syslog message derived from the priority."), @WritesAttribute(attribute = "syslog.severity", description = "The severity of the Syslog message derived from the priority."),
@WritesAttribute(attribute = "syslog.facility", description = "The facility of the Syslog message derived from the priority."), @WritesAttribute(attribute = "syslog.facility", description = "The facility of the Syslog message derived from the priority."),
@WritesAttribute(attribute = "syslog.version", description = "The optional version from the Syslog message."), @WritesAttribute(attribute = "syslog.version", description = "The optional version from the Syslog message."),
@WritesAttribute(attribute = "syslog.timestamp", description = "The timestamp of the Syslog message."), @WritesAttribute(attribute = "syslog.timestamp", description = "The timestamp of the Syslog message."),
@WritesAttribute(attribute = "syslog.hostname", description = "The hostname of the Syslog message."), @WritesAttribute(attribute = "syslog.hostname", description = "The hostname or IP address of the Syslog message."),
@WritesAttribute(attribute = "syslog.sender", description = "The hostname of the Syslog server that sent the message."), @WritesAttribute(attribute = "syslog.sender", description = "The hostname of the Syslog server that sent the message."),
@WritesAttribute(attribute = "syslog.body", description = "The body of the Syslog message, everything after the hostname.")}) @WritesAttribute(attribute = "syslog.body", description = "The body of the Syslog message, everything after the hostname.")})
@SeeAlso({ListenSyslog.class, PutSyslog.class}) @SeeAlso({ListenSyslog.class, PutSyslog.class})

View File

@ -52,7 +52,7 @@ public class SyslogParser {
// stamp MMM d HH:mm:ss, single digit date has two spaces // stamp MMM d HH:mm:ss, single digit date has two spaces
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
"\\s" + // separator "\\s" + // separator
"([\\w][\\w\\d\\.@-]*)" + // host "([\\w][\\w\\d(\\.|\\:)@-]*)" + // host
"\\s(.*)$"; // body "\\s(.*)$"; // body
public static final Collection<Pattern> MESSAGE_PATTERNS; public static final Collection<Pattern> MESSAGE_PATTERNS;

View File

@ -29,13 +29,18 @@ public class TestParseSyslog {
static final String FAC = "4"; static final String FAC = "4";
static final String TIME = "Oct 13 15:43:23"; static final String TIME = "Oct 13 15:43:23";
static final String HOST = "localhost.home"; static final String HOST = "localhost.home";
static final String IPV6SRC = "fe80::216:3300:eeaa:eeaa";
static final String IPV4SRC = "8.8.4.4";
static final String BODY = "some message"; static final String BODY = "some message";
static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String VALID_MESSAGE_RFC3164_1 = "<" + PRI + ">" + TIME + " " + IPV6SRC + " " + BODY + "\n";
static final String VALID_MESSAGE_RFC3164_2 = "<" + PRI + ">" + TIME + " " + IPV4SRC + " " + BODY + "\n";
@Test @Test
public void testSuccessfulParse3164() { public void testSuccessfulParse3164() {
final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog()); final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
runner.enqueue(VALID_MESSAGE_RFC3164_0.getBytes()); runner.enqueue(VALID_MESSAGE_RFC3164_0.getBytes());
runner.run(); runner.run();
@ -49,6 +54,37 @@ public class TestParseSyslog {
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME); mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
} }
@Test
public void testValidIPv6Source() {
final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
runner.enqueue(VALID_MESSAGE_RFC3164_1.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), IPV6SRC);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
}
@Test
public void testValidIPv4Source() {
final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog());
runner.enqueue(VALID_MESSAGE_RFC3164_2.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), IPV4SRC);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
}
@Test @Test
public void testInvalidMessage() { public void testInvalidMessage() {