NIFI-5337 Syslog 5424 Record Reader and nifi-syslog-utils

- Create nifi-syslog-utils to move syslog parsing functionalty to a central location shared by the processors and serialization/record system.
- Refactor Processors to use these utils
- Update 5424 syslog classes using simple-syslog-5424 to pick up new changes to support this work, as well as keep dependencies/types from bleeding out to the
processors or readers
- Refactor Syslog5424Event and Parser
- Create Syslog5424RecordReader
- per review, handle blank message differently from eof
- name schema per review

This closes #2816.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Otto Fowler 2018-06-22 15:44:38 -04:00 committed by Bryan Bende
parent ec7f131602
commit b10220439c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
41 changed files with 1288 additions and 371 deletions

View File

@ -0,0 +1,48 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-syslog-utils</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.github.palindromicity</groupId>
<artifactId>simple-syslog-5424</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.8.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
package org.apache.nifi.syslog.attributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
@ -23,13 +23,20 @@ import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
*/
public enum Syslog5424Attributes implements FlowFileAttributeKey {
APP_NAME("syslog.appName"),
PROCID("syslog.procid"),
MESSAGEID("syslog.messageid"),
STRUCTURED_BASE("syslog.structuredData"),
STRUCTURED_ELEMENT_ID_FMT("syslog.structuredData.%s"),
STRUCTURED_ELEMENT_ID_PNAME_FMT("syslog.structuredData.%s.%s"),
STRUCTURED_ELEMENT_ID_PNAME_PATTERN("syslog.structuredData\\.(.*)\\.(.*)$");
SYSLOG_APP_NAME("syslog.appName"),
SYSLOG_PROCID("syslog.procid"),
SYSLOG_MESSAGEID("syslog.messageid"),
SYSLOG_STRUCTURED_BASE("syslog.structuredData"),
SYSLOG_STRUCTURED_ELEMENT_ID_FMT("syslog.structuredData.%s"),
SYSLOG_STRUCTURED_ELEMENT_ID_PNAME_FMT("syslog.structuredData.%s.%s"),
SYSLOG_STRUCTURED_ELEMENT_ID_PNAME_PATTERN("syslog.structuredData\\.(.*)\\.(.*)$"),
APP_NAME("appName"),
PROCID("procid"),
MESSAGEID("messageid"),
STRUCTURED_BASE("structuredData"),
STRUCTURED_ELEMENT_ID_FMT("structuredData.%s"),
STRUCTURED_ELEMENT_ID_PNAME_FMT("structuredData.%s.%s"),
STRUCTURED_ELEMENT_ID_PNAME_PATTERN("structuredData\\.(.*)\\.(.*)$");
private String key;
Syslog5424Attributes(String key) {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
package org.apache.nifi.syslog.attributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
@ -23,17 +23,29 @@ import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
*/
public enum SyslogAttributes implements FlowFileAttributeKey {
PRIORITY("syslog.priority"),
SEVERITY("syslog.severity"),
FACILITY("syslog.facility"),
VERSION("syslog.version"),
TIMESTAMP("syslog.timestamp"),
HOSTNAME("syslog.hostname"),
SENDER("syslog.sender"),
BODY("syslog.body"),
VALID("syslog.valid"),
PROTOCOL("syslog.protocol"),
PORT("syslog.port");
SYSLOG_PRIORITY("syslog.priority"),
SYSLOG_SEVERITY("syslog.severity"),
SYSLOG_FACILITY("syslog.facility"),
SYSLOG_VERSION("syslog.version"),
SYSLOG_TIMESTAMP("syslog.timestamp"),
SYSLOG_HOSTNAME("syslog.hostname"),
SYSLOG_SENDER("syslog.sender"),
SYSLOG_BODY("syslog.body"),
SYSLOG_VALID("syslog.valid"),
SYSLOG_PROTOCOL("syslog.protocol"),
SYSLOG_PORT("syslog.port"),
PRIORITY("priority"),
SEVERITY("severity"),
FACILITY("facility"),
VERSION("version"),
TIMESTAMP("timestamp"),
HOSTNAME("hostname"),
SENDER("sender"),
BODY("body"),
VALID("valid"),
PROTOCOL("protocol"),
PORT("port");
private String key;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
package org.apache.nifi.syslog.events;
import java.util.Map;
@ -22,11 +22,12 @@ import java.util.Map;
* Encapsulates the parsed information for a single Syslog 5424 event.
*/
public class Syslog5424Event {
private final Map<String,String> fieldMap;
private final Map<String, Object> fieldMap;
private final String fullMessage;
private final byte[] rawMessage;
private final String sender;
private final boolean valid;
private final Exception exception;
private Syslog5424Event(final Builder builder) {
this.fieldMap = builder.fieldMap;
@ -34,12 +35,17 @@ public class Syslog5424Event {
this.rawMessage = builder.rawMessage;
this.sender = builder.sender;
this.valid = builder.valid;
this.exception = builder.exception;
}
public Map<String,String> getFieldMap() {
public Map<String, Object> getFieldMap() {
return fieldMap;
}
public Exception getException() {
return exception;
}
public String getFullMessage() {
return fullMessage;
}
@ -57,9 +63,10 @@ public class Syslog5424Event {
}
public static final class Builder {
private Exception exception;
private String fullMessage;
private String sender;
private Map<String, String> fieldMap;
private Map<String, Object> fieldMap;
private byte[] rawMessage;
private boolean valid;
@ -68,6 +75,7 @@ public class Syslog5424Event {
this.sender = null;
this.fullMessage = null;
this.valid = false;
this.exception = null;
}
public Builder sender(String sender) {
@ -75,7 +83,12 @@ public class Syslog5424Event {
return this;
}
public Builder fieldMap(Map<String, String> fieldMap) {
public Builder exception(Exception exception) {
this.exception = exception;
return this;
}
public Builder fieldMap(Map<String, Object> fieldMap) {
this.fieldMap = fieldMap;
return this;
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
package org.apache.nifi.syslog.events;
/**
* Encapsulates the parsed information for a single Syslog event.

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog.keyproviders;
import com.github.palindromicity.syslog.KeyProvider;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import java.util.regex.Pattern;
public class SimpleKeyProvider implements KeyProvider {
private Pattern pattern;
public SimpleKeyProvider() {
}
@Override
public String getMessage() {
return SyslogAttributes.BODY.key();
}
@Override
public String getHeaderAppName(){
return Syslog5424Attributes.APP_NAME.key();
}
@Override
public String getHeaderHostName() {
return SyslogAttributes.HOSTNAME.key();
}
@Override
public String getHeaderPriority() {
return SyslogAttributes.PRIORITY.key();
}
@Override
public String getHeaderFacility() {
return SyslogAttributes.FACILITY.key();
}
@Override
public String getHeaderSeverity() {
return SyslogAttributes.SEVERITY.key();
}
@Override
public String getHeaderProcessId() {
return Syslog5424Attributes.PROCID.key();
}
@Override
public String getHeaderTimeStamp() {
return SyslogAttributes.TIMESTAMP.key();
}
@Override
public String getHeaderMessageId() {
return Syslog5424Attributes.MESSAGEID.key();
}
@Override
public String getHeaderVersion() {
return SyslogAttributes.VERSION.key();
}
@Override
public String getStructuredBase() {
return Syslog5424Attributes.STRUCTURED_BASE.key();
}
@Override
public String getStructuredElementIdFormat() {
return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_FMT.key();
}
@Override
public String getStructuredElementIdParamNameFormat() {
return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_FMT.key();
}
@Override
public Pattern getStructuredElementIdParamNamePattern() {
if (pattern == null) {
pattern = Pattern.compile(Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_PATTERN.key());
}
return pattern;
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog.keyproviders;
import com.github.palindromicity.syslog.KeyProvider;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import java.util.regex.Pattern;
public class SyslogPrefixedKeyProvider implements KeyProvider {
private Pattern pattern;
public SyslogPrefixedKeyProvider() {
}
@Override
public String getMessage() {
return SyslogAttributes.SYSLOG_BODY.key();
}
@Override
public String getHeaderAppName() {
return Syslog5424Attributes.SYSLOG_APP_NAME.key();
}
@Override
public String getHeaderHostName() {
return SyslogAttributes.SYSLOG_HOSTNAME.key();
}
@Override
public String getHeaderPriority() {
return SyslogAttributes.SYSLOG_PRIORITY.key();
}
@Override
public String getHeaderFacility() {
return SyslogAttributes.SYSLOG_FACILITY.key();
}
@Override
public String getHeaderSeverity() {
return SyslogAttributes.SYSLOG_SEVERITY.key();
}
@Override
public String getHeaderProcessId() {
return Syslog5424Attributes.SYSLOG_PROCID.key();
}
@Override
public String getHeaderTimeStamp() {
return SyslogAttributes.SYSLOG_TIMESTAMP.key();
}
@Override
public String getHeaderMessageId() {
return Syslog5424Attributes.SYSLOG_MESSAGEID.key();
}
@Override
public String getHeaderVersion() {
return SyslogAttributes.SYSLOG_VERSION.key();
}
@Override
public String getStructuredBase() {
return Syslog5424Attributes.SYSLOG_STRUCTURED_BASE.key();
}
@Override
public String getStructuredElementIdFormat() {
return Syslog5424Attributes.SYSLOG_STRUCTURED_ELEMENT_ID_FMT.key();
}
@Override
public String getStructuredElementIdParamNameFormat() {
return Syslog5424Attributes.SYSLOG_STRUCTURED_ELEMENT_ID_PNAME_FMT.key();
}
@Override
public Pattern getStructuredElementIdParamNamePattern() {
if (pattern == null) {
pattern = Pattern.compile(Syslog5424Attributes.SYSLOG_STRUCTURED_ELEMENT_ID_PNAME_PATTERN.key());
}
return pattern;
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog.parsers;
import com.github.palindromicity.syslog.KeyProvider;
import com.github.palindromicity.syslog.NilPolicy;
import com.github.palindromicity.syslog.StructuredDataPolicy;
import com.github.palindromicity.syslog.SyslogParserBuilder;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance.
* For 5424 we use simple-syslog-5424 since it parsers out structured data.
*/
public class StrictSyslog5424Parser {
private Charset charset;
private com.github.palindromicity.syslog.SyslogParser parser;
public StrictSyslog5424Parser() {
this(StandardCharsets.UTF_8, NilHandlingPolicy.NULL, NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
public StrictSyslog5424Parser(final Charset charset, final NilHandlingPolicy nilPolicy,
NifiStructuredDataPolicy structuredDataPolicy, KeyProvider keyProvider) {
this.charset = charset;
parser = new SyslogParserBuilder()
.withNilPolicy(NilPolicy.valueOf(nilPolicy.name()))
.withStructuredDataPolicy(StructuredDataPolicy.valueOf(structuredDataPolicy.name()))
.withKeyProvider(keyProvider)
.build();
}
/**
* Parses a Syslog5424Event from a {@code ByteBuffer}.
*
* @param buffer a {@code ByteBuffer} containing a syslog message
* @return a Syslog5424Event parsed from the {@code {@code byte array}}
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer) {
return parseEvent(buffer, null);
}
/**
* Parses a Syslog5424Event from a {@code ByteBuffer}.
*
* @param buffer a {@code ByteBuffer} containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the {@code byte array}
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) {
if (buffer == null) {
return null;
}
return parseEvent(bufferToBytes(buffer), sender);
}
/**
* Parses a Syslog5424Event from a {@code byte array}.
*
* @param bytes a {@code byte array} containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the {@code byte array}
*/
public Syslog5424Event parseEvent(final byte[] bytes, final String sender) {
if (bytes == null || bytes.length == 0) {
return null;
}
// remove trailing new line before parsing
int length = bytes.length;
if (bytes[length - 1] == '\n') {
length = length - 1;
}
final String message = new String(bytes, 0, length, charset);
final Syslog5424Event.Builder builder = new Syslog5424Event.Builder()
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
try {
parser.parseLine(message, builder::fieldMap);
builder.valid(true);
} catch (Exception e) {
// this is not a valid 5424 message
builder.valid(false);
builder.exception(e);
}
// either invalid w/original msg, or fully parsed event
return builder.build();
}
public String getCharsetName() {
return charset == null ? StandardCharsets.UTF_8.name() : charset.name();
}
private byte[] bufferToBytes(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
if (buffer.position() != 0) {
buffer.flip();
}
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes, 0, buffer.limit());
return bytes;
}
}

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
package org.apache.nifi.syslog.parsers;
import org.apache.nifi.syslog.events.SyslogEvent;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog.utils;
/**
* Policy for handling Structured Data
* must match the simple-syslog-5424 StructuredDataPolicy
*/
public enum NifiStructuredDataPolicy {
/**
* The Structured Data will be flattened per the KeyProvider provided values.
*/
FLATTEN,
/**
* The Structued Data will be returned as a Map field named structuredData.
* Each map entry will have the value of the Structured Data ID, and a value
* of a map of each element param name and value
*/
MAP_OF_MAPS
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog.utils;
/**
* Policies available for handling NIL '-' values.
* Must match the simple-syslog-5424 NilPolicy
*/
public enum NilHandlingPolicy {
/**
* a nil value will result msg part being omitted from the map.
*/
OMIT,
/**
* a nil value will result in a null value in the map.
*/
NULL,
/**
* a nil value will result in a '-' symbol in the map.
*/
DASH
}

View File

@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.syslog;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.syslog.StrictSyslog5424Parser;
import org.apache.nifi.processors.standard.syslog.Syslog5424Attributes;
import org.apache.nifi.processors.standard.syslog.Syslog5424Event;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -39,9 +40,9 @@ public abstract class BaseStrictSyslog5424ParserTest {
private static final String NIL_VALUE = "-";
private StrictSyslog5424Parser parser;
protected abstract NilPolicy getPolicy();
protected abstract NilHandlingPolicy getPolicy();
protected void validateForPolicy(String expected, String actual) {
protected void validateForPolicy(String expected, Object actual) {
switch (getPolicy()) {
case DASH:
Assert.assertEquals(actual, NIL_VALUE);
@ -55,7 +56,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
@Before
public void setup() {
parser = new StrictSyslog5424Parser(CHARSET, getPolicy());
parser = new StrictSyslog5424Parser(CHARSET, getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
@Test
@ -82,25 +83,25 @@ public abstract class BaseStrictSyslog5424ParserTest {
Assert.assertNotNull(event);
Assert.assertTrue(event.isValid());
Assert.assertFalse(event.getFieldMap().isEmpty());
Map<String,String> fieldMap = event.getFieldMap();
Assert.assertEquals(pri, fieldMap.get(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals("2", fieldMap.get(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals("4", fieldMap.get(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(version, fieldMap.get(SyslogAttributes.VERSION.key()));
Assert.assertEquals(stamp, fieldMap.get(SyslogAttributes.TIMESTAMP.key()));
Assert.assertEquals(host, fieldMap.get(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(appName, fieldMap.get(Syslog5424Attributes.APP_NAME.key()));
validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.PROCID.key()));
Assert.assertEquals(msgId, fieldMap.get(Syslog5424Attributes.MESSAGEID.key()));
Map<String, Object> fieldMap = event.getFieldMap();
Assert.assertEquals(pri, fieldMap.get(SyslogAttributes.SYSLOG_PRIORITY.key()));
Assert.assertEquals("2", fieldMap.get(SyslogAttributes.SYSLOG_SEVERITY.key()));
Assert.assertEquals("4", fieldMap.get(SyslogAttributes.SYSLOG_FACILITY.key()));
Assert.assertEquals(version, fieldMap.get(SyslogAttributes.SYSLOG_VERSION.key()));
Assert.assertEquals(stamp, fieldMap.get(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
Assert.assertEquals(host, fieldMap.get(SyslogAttributes.SYSLOG_HOSTNAME.key()));
Assert.assertEquals(appName, fieldMap.get(Syslog5424Attributes.SYSLOG_APP_NAME.key()));
validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.SYSLOG_PROCID.key()));
Assert.assertEquals(msgId, fieldMap.get(Syslog5424Attributes.SYSLOG_MESSAGEID.key()));
Pattern structuredPattern = new StrictSyslog5424Parser.NifiKeyProvider().getStructuredElementIdParamNamePattern();
fieldMap.forEach((key,value) -> {
if (!StringUtils.isBlank(value)) {
Assert.assertFalse(structuredPattern.matcher(value).matches());
Pattern structuredPattern = new SyslogPrefixedKeyProvider().getStructuredElementIdParamNamePattern();
fieldMap.forEach((key, value) -> {
if (value != null) {
Assert.assertFalse(structuredPattern.matcher(key).matches());
}
});
Assert.assertEquals(body, fieldMap.get(SyslogAttributes.BODY.key()));
Assert.assertEquals(body, fieldMap.get(SyslogAttributes.SYSLOG_BODY.key()));
Assert.assertEquals(message, event.getFullMessage());
Assert.assertNull(event.getSender());
}
@ -132,7 +133,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
@Test
public void testTrailingNewLine() {
final String message = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
"ID47 - BOM'su root' failed for lonvick on /dev/pts/8\n";
"ID47 - BOM'su root' failed for lonvick on /dev/pts/8\n";
final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.syslog;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
public class StrictSyslog5424ParserDashPolicyTest extends BaseStrictSyslog5424ParserTest {
protected NilPolicy getPolicy() {
return NilPolicy.DASH;
protected NilHandlingPolicy getPolicy() {
return NilHandlingPolicy.DASH;
}
}

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.syslog;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
public class StrictSyslog5424ParserNullPolicyTest extends BaseStrictSyslog5424ParserTest {
protected NilPolicy getPolicy() {
return NilPolicy.NULL;
protected NilHandlingPolicy getPolicy() {
return NilHandlingPolicy.NULL;
}
}

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.syslog;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
public class StrictSyslog5424ParserOmitPolicyTest extends BaseStrictSyslog5424ParserTest {
protected NilPolicy getPolicy() {
return NilPolicy.OMIT;
protected NilHandlingPolicy getPolicy() {
return NilHandlingPolicy.OMIT;
}
}

View File

@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
package org.apache.nifi.syslog;
import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

View File

@ -31,6 +31,7 @@
<module>nifi-hadoop-utils</module>
<module>nifi-processor-utils</module>
<module>nifi-reporting-utils</module>
<module>nifi-syslog-utils</module>
</modules>
</project>

View File

@ -29,6 +29,11 @@
<artifactId>nifi-processor-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-syslog-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
@ -327,10 +332,6 @@
<version>1.8.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.palindromicity</groupId>
<artifactId>simple-syslog-5424</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -48,12 +48,12 @@ import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@ -419,8 +419,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String, String> defaultAttributes = new HashMap<>(4);
defaultAttributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
defaultAttributes.put(SyslogAttributes.PORT.key(), port);
defaultAttributes.put(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol);
defaultAttributes.put(SyslogAttributes.SYSLOG_PORT.key(), port);
defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
@ -461,7 +461,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
FlowFile invalidFlowFile = session.create();
invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes);
if (sender != null) {
invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SENDER.key(), sender);
invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SYSLOG_SENDER.key(), sender);
}
try {
@ -486,14 +486,14 @@ public class ListenSyslog extends AbstractSyslogProcessor {
getLogger().trace(event.getFullMessage());
final Map<String, String> attributes = new HashMap<>(numAttributes);
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
attributes.put(SyslogAttributes.SYSLOG_PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SYSLOG_SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.SYSLOG_FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.SYSLOG_VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.SYSLOG_TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.SYSLOG_HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.SYSLOG_BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.SYSLOG_VALID.key(), String.valueOf(event.isValid()));
flowFile = session.putAllAttributes(flowFile, attributes);
}
@ -536,7 +536,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final Map<String, String> newAttributes = new HashMap<>(defaultAttributes.size() + 1);
newAttributes.putAll(defaultAttributes);
newAttributes.put(SyslogAttributes.SENDER.key(), sender);
newAttributes.put(SyslogAttributes.SYSLOG_SENDER.key(), sender);
flowFile = session.putAllAttributes(flowFile, newAttributes);
getLogger().debug("Transferring {} to success", new Object[] {flowFile});

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -45,10 +36,20 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@ -145,13 +146,13 @@ public class ParseSyslog extends AbstractProcessor {
}
final Map<String, String> attributes = new HashMap<>(8);
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.SYSLOG_PRIORITY.key(), event.getPriority());
attributes.put(SyslogAttributes.SYSLOG_SEVERITY.key(), event.getSeverity());
attributes.put(SyslogAttributes.SYSLOG_FACILITY.key(), event.getFacility());
attributes.put(SyslogAttributes.SYSLOG_VERSION.key(), event.getVersion());
attributes.put(SyslogAttributes.SYSLOG_TIMESTAMP.key(), event.getTimeStamp());
attributes.put(SyslogAttributes.SYSLOG_HOSTNAME.key(), event.getHostName());
attributes.put(SyslogAttributes.SYSLOG_BODY.key(), event.getMsgBody());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -40,15 +39,19 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.syslog.StrictSyslog5424Parser;
import org.apache.nifi.processors.standard.syslog.Syslog5424Event;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -82,9 +85,9 @@ import java.util.Set;
@SeeAlso({ListenSyslog.class, ParseSyslog.class, PutSyslog.class})
public class ParseSyslog5424 extends AbstractProcessor {
public static final AllowableValue OMIT = new AllowableValue(NilPolicy.OMIT.name(),NilPolicy.OMIT.name(),"The missing field will not have an attribute added.");
public static final AllowableValue NULL = new AllowableValue(NilPolicy.NULL.name(),NilPolicy.NULL.name(),"The missing field will have an empty attribute added.");
public static final AllowableValue DASH = new AllowableValue(NilPolicy.DASH.name(),NilPolicy.DASH.name(),"The missing field will have an attribute added with the value of '-'.");
public static final AllowableValue OMIT = new AllowableValue(NilHandlingPolicy.OMIT.name(),NilHandlingPolicy.OMIT.name(),"The missing field will not have an attribute added.");
public static final AllowableValue NULL = new AllowableValue(NilHandlingPolicy.NULL.name(),NilHandlingPolicy.NULL.name(),"The missing field will have an empty attribute added.");
public static final AllowableValue DASH = new AllowableValue(NilHandlingPolicy.DASH.name(),NilHandlingPolicy.DASH.name(),"The missing field will have an attribute added with the value of '-'.");
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
@ -149,7 +152,9 @@ public class ParseSyslog5424 extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
final String charsetName = context.getProperty(CHARSET).getValue();
final String nilPolicyString = context.getProperty(NIL_POLICY).getValue();
parser = new StrictSyslog5424Parser(Charset.forName(charsetName),NilPolicy.valueOf(nilPolicyString));
parser = new StrictSyslog5424Parser(Charset.forName(charsetName),
NilHandlingPolicy.valueOf(nilPolicyString),
NifiStructuredDataPolicy.FLATTEN,new SyslogPrefixedKeyProvider());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -186,11 +191,18 @@ public class ParseSyslog5424 extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
return;
}
Map<String,String> attributeMap = syslogEvent.getFieldMap();
Map<String,String> attributeMap = convertMap(syslogEvent.getFieldMap());
if (!includeBody) {
attributeMap.remove(SyslogAttributes.BODY.key());
attributeMap.remove(SyslogAttributes.SYSLOG_BODY.key());
}
flowFile = session.putAllAttributes(flowFile, attributeMap);
session.transfer(flowFile, REL_SUCCESS);
}
private static Map<String,String> convertMap(Map<String, Object> map) {
Map<String,String> returnMap = new HashMap<>();
map.forEach((key,value) -> returnMap.put(key,(String)value));
return returnMap;
}
}

View File

@ -39,8 +39,8 @@ import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext;

View File

@ -1,206 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.syslog;
import com.github.palindromicity.syslog.KeyProvider;
import com.github.palindromicity.syslog.NilPolicy;
import com.github.palindromicity.syslog.SyslogParserBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance.
* For 5424 we use simple-syslog-5424 since it parsers out structured data.
*/
public class StrictSyslog5424Parser {
private Charset charset;
private com.github.palindromicity.syslog.SyslogParser parser;
public StrictSyslog5424Parser() {
this(StandardCharsets.UTF_8, NilPolicy.NULL);
}
public StrictSyslog5424Parser(final Charset charset, final NilPolicy nilPolicy) {
this.charset = charset;
parser = new SyslogParserBuilder()
.withNilPolicy(nilPolicy)
.withKeyProvider(new NifiKeyProvider())
.build();
}
/**
* Parses a Syslog5424Event from a byte buffer.
*
* @param buffer a byte buffer containing a syslog message
* @return a Syslog5424Event parsed from the byte array
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer) {
return parseEvent(buffer, null);
}
/**
* Parses a Syslog5424Event from a byte buffer.
*
* @param buffer a byte buffer containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the byte array
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) {
if (buffer == null) {
return null;
}
if (buffer.position() != 0) {
buffer.flip();
}
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes, 0, buffer.limit());
return parseEvent(bytes, sender);
}
/**
* Parses a Syslog5424Event from a byte array.
*
* @param bytes a byte array containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the byte array
*/
public Syslog5424Event parseEvent(final byte[] bytes, final String sender) {
if (bytes == null || bytes.length == 0) {
return null;
}
// remove trailing new line before parsing
int length = bytes.length;
if (bytes[length - 1] == '\n') {
length = length - 1;
}
final String message = new String(bytes, 0, length, charset);
final Syslog5424Event.Builder builder = new Syslog5424Event.Builder()
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
try {
parser.parseLine(message,(map)-> {
builder.fieldMap(convertMap(map));
});
builder.valid(true);
return builder.build();
} catch (Exception e) {
// this is not a valid 5424 message
builder.valid(false);
}
// either invalid w/original msg, or fully parsed event
return builder.build();
}
public String getCharsetName() {
return charset == null ? StandardCharsets.UTF_8.name() : charset.name();
}
private static Map<String,String> convertMap(Map<String, Object> map) {
Map<String,String> returnMap = new HashMap<>();
map.forEach((key,value) -> returnMap.put(key,(String)value));
return returnMap;
}
public static class NifiKeyProvider implements KeyProvider {
private Pattern pattern;
public NifiKeyProvider(){}
@Override
public String getMessage() {
return SyslogAttributes.BODY.key();
}
@Override
public String getHeaderAppName() {
return Syslog5424Attributes.APP_NAME.key();
}
@Override
public String getHeaderHostName() {
return SyslogAttributes.HOSTNAME.key();
}
@Override
public String getHeaderPriority() {
return SyslogAttributes.PRIORITY.key();
}
@Override
public String getHeaderFacility() {
return SyslogAttributes.FACILITY.key();
}
@Override
public String getHeaderSeverity() {
return SyslogAttributes.SEVERITY.key();
}
@Override
public String getHeaderProcessId() {
return Syslog5424Attributes.PROCID.key();
}
@Override
public String getHeaderTimeStamp() {
return SyslogAttributes.TIMESTAMP.key();
}
@Override
public String getHeaderMessageId() {
return Syslog5424Attributes.MESSAGEID.key();
}
@Override
public String getHeaderVersion() {
return SyslogAttributes.VERSION.key();
}
@Override
public String getStructuredBase() {
return Syslog5424Attributes.STRUCTURED_BASE.key();
}
@Override
public String getStructuredElementIdFormat() {
return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_FMT.key();
}
@Override
public String getStructuredElementIdParamNameFormat() {
return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_FMT.key();
}
@Override
public Pattern getStructuredElementIdParamNamePattern() {
if (pattern == null) {
pattern = Pattern.compile(Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_PATTERN.key());
}
return pattern;
}
}
}

View File

@ -18,7 +18,7 @@ package org.apache.nifi.processors.standard
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSessionFactory
import org.apache.nifi.processors.standard.syslog.SyslogParser
import org.apache.nifi.syslog.parsers.SyslogParser
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.util.encoders.Hex

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.processors.standard.syslog.SyslogParser
import org.apache.nifi.syslog.parsers.SyslogParser
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.util.encoders.Hex

View File

@ -19,9 +19,9 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -301,16 +301,16 @@ public class ITListenSyslog {
private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key()));
Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key()));
Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key()));
Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key()));
Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.SYSLOG_PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SYSLOG_SEVERITY.key()));
Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.SYSLOG_FACILITY.key()));
Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.SYSLOG_HOSTNAME.key()));
Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.SYSLOG_VALID.key()));
Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
}
/**

View File

@ -25,11 +25,11 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -98,9 +98,9 @@ public class TestListenSyslog {
runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
Assert.assertEquals("0", flowFile.getAttribute(SyslogAttributes.PORT.key()));
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
Assert.assertEquals("0", flowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
final String[] splits = content.split("\\|");

View File

@ -17,7 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -46,12 +46,12 @@ public class TestParseSyslog {
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), HOST);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_HOSTNAME.key(), HOST);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_TIMESTAMP.key(), TIME);
}
@Test
@ -62,12 +62,12 @@ public class TestParseSyslog {
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), IPV6SRC);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_HOSTNAME.key(), IPV6SRC);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_TIMESTAMP.key(), TIME);
}
@Test
@ -78,12 +78,12 @@ public class TestParseSyslog {
runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0);
mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), IPV4SRC);
mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_BODY.key(), BODY);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_FACILITY.key(), FAC);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_HOSTNAME.key(), IPV4SRC);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_PRIORITY.key(), PRI);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_SEVERITY.key(), SEV);
mff.assertAttributeEquals(SyslogAttributes.SYSLOG_TIMESTAMP.key(), TIME);
}
@Test

View File

@ -18,7 +18,7 @@
package org.apache.nifi.processors.standard;
import com.github.palindromicity.syslog.NilPolicy;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -73,7 +73,7 @@ public class TestParseSyslog5424 {
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS);
Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.BODY.key()));
Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
}
@Test
@ -85,7 +85,7 @@ public class TestParseSyslog5424 {
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS);
Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.BODY.key()));
Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
}
@Test
@ -97,6 +97,6 @@ public class TestParseSyslog5424 {
runner.run();
runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS);
Assert.assertNull(results.get(0).getAttribute(SyslogAttributes.BODY.key()));
Assert.assertNull(results.get(0).getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
}
}

View File

@ -380,11 +380,6 @@
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>com.github.palindromicity</groupId>
<artifactId>simple-syslog-5424</artifactId>
<version>0.0.5</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -77,4 +77,12 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Apache Avro
Copyright 2009-2017 The Apache Software Foundation
(ASLv2) simple-syslog-5424
The following NOTICE information applies:
simple-syslog-5424
https://github.com/palindromicity/simple-syslog-5424
Copyright 2018 simple-syslog-5424 authors.

View File

@ -46,6 +46,11 @@
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-syslog-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
@ -145,6 +150,10 @@
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
<exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
<exclude>src/test/resources/json/record-choice.avsc</exclude>
<exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix_in_error.txt</exclude>
<exclude>src/test/resources/xml/people.xml</exclude>
<exclude>src/test/resources/xml/people2.xml</exclude>
<exclude>src/test/resources/xml/people3.xml</exclude>

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" +
" so that it can be processed.")
public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory {
public static final String RFC_5424_SCHEMA_NAME = "default-5424-schema";
static final AllowableValue RFC_5424_SCHEMA = new AllowableValue(RFC_5424_SCHEMA_NAME, "Use RFC 5424 Schema",
"The schema will be the default schema per RFC 5424.");
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies which character set of the Syslog messages")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
private volatile StrictSyslog5424Parser parser;
private volatile RecordSchema recordSchema;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(1);
properties.add(CHARSET);
return properties;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String charsetName = context.getProperty(CHARSET).getValue();
parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
recordSchema = createRecordSchema();
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>();
allowableValues.add(RFC_5424_SCHEMA);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return RFC_5424_SCHEMA;
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
return createAccessStrategy();
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
return createAccessStrategy();
}
static RecordSchema createRecordSchema() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build();
final RecordSchema schema = new SimpleRecordSchema(fields,schemaIdentifier);
return schema;
}
private SchemaAccessStrategy createAccessStrategy() {
return new SchemaAccessStrategy() {
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
return recordSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
};
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new Syslog5424RecordReader(parser, in, schema);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.util.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
public class Syslog5424RecordReader implements RecordReader {
private final BufferedReader reader;
private RecordSchema schema;
private final StrictSyslog5424Parser parser;
public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){
this.reader = new BufferedReader(new InputStreamReader(in));
this.schema = schema;
this.parser = parser;
}
@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
String line = reader.readLine();
if ( line == null ) {
// a null return from readLine() signals the end of the stream
return null;
}
if (StringUtils.isBlank(line)) {
// while an empty string is an error
throw new MalformedRecordException("Encountered a blank message!");
}
final MalformedRecordException malformedRecordException;
Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
if (!event.isValid()) {
if (event.getException() != null) {
malformedRecordException = new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+
"formats supported", line), event.getException());
} else {
malformedRecordException = new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
" formats supported", line));
}
throw malformedRecordException;
}
Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
return new MapRecord(schema,modifiedMap);
}
@Override
public RecordSchema getSchema() throws MalformedRecordException {
return schema;
}
@Override
public void close() throws IOException {
this.reader.close();
}
private Timestamp convertTimeStamp(String timeString) {
/*
From RFC 5424: https://tools.ietf.org/html/rfc5424#page-11
The TIMESTAMP field is a formalized timestamp derived from [RFC3339].
Whereas [RFC3339] makes allowances for multiple syntaxes, this
document imposes further restrictions. The TIMESTAMP value MUST
follow these restrictions:
o The "T" and "Z" characters in this syntax MUST be upper case.
o Usage of the "T" character is REQUIRED.
o Leap seconds MUST NOT be used.
*/
if (timeString == null) {
return null;
}
return Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(timeString)));
}
}

View File

@ -25,7 +25,7 @@ org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.grok.GrokReader
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.xml.XMLReader
org.apache.nifi.xml.XMLRecordSetWriter

View File

@ -0,0 +1,91 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Syslog5424Reader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<p>
The Syslog5424Reader Controller Service provides a means for parsing valid <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 Syslog</a> messages.
This service produces records with a set schema to match the specification.
</p>
<p>
The Required Property of this service is named <code>Character Set</code> and specifies the Character Set of the incoming text.
</p>
<h2>Schemas</h2>
<p>
When a record is parsed from incoming data, it is parsed into the RFC 5424 schema.
<h4>The RFC 5424 schema</h4>
<code><pre>
{
"type" : "record",
"name" : "nifiRecord",
"namespace" : "org.apache.nifi",
"fields" : [ {
"name" : "priority",
"type" : [ "null", "string" ]
}, {
"name" : "severity",
"type" : [ "null", "string" ]
}, {
"name" : "facility",
"type" : [ "null", "string" ]
}, {
"name" : "version",
"type" : [ "null", "string" ]
}, {
"name" : "timestamp",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-millis"
} ]
}, {
"name" : "hostname",
"type" : [ "null", "string" ]
}, {
"name" : "body",
"type" : [ "null", "string" ]
},
"name" : "appName",
"type" : [ "null", "string" ]
}, {
"name" : "procid",
"type" : [ "null", "string" ]
}, {
"name" : "messageid",
"type" : [ "null", "string" ]
}, {
"name" : "structuredData",
"type" : [ "null", {
"type" : "map",
"values" : {
"type" : "map",
"values" : "string"
}
} ]
} ]
}
</pre></code>
</p>
</body>
</html>

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestSyslog5424RecordReader {
private static final Charset CHARSET = Charset.forName("UTF-8");
private static final String expectedVersion = "1";
private static final String expectedMessage = "Removing instance";
private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38";
private static final String expectedHostName = "loggregator";
private static final String expectedPri = "14";
private static final String expectedProcId = "DEA";
private static final Timestamp expectedTimestamp = Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse("2014-06-20T09:14:07+00:00")));
private static final String expectedMessageId = "MSG-01";
private static final String expectedFacility = "1";
private static final String expectedSeverity = "6";
private static final String expectedIUT1 = "3";
private static final String expectedIUT2 = "4";
private static final String expectedEventSource1 = "Application";
private static final String expectedEventSource2 = "Other Application";
private static final String expectedEventID1 = "1011";
private static final String expectedEventID2 = "2022";
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLine() throws IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_all.txt"))) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key()));
Assert.assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key()));
Assert.assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key()));
Assert.assertEquals(expectedTimestamp, (Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key()));
Assert.assertEquals(expectedMessageId, record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
Assert.assertTrue(structured.containsKey("exampleSDID@32473"));
Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473");
Assert.assertTrue(example1.containsKey("iut"));
Assert.assertTrue(example1.containsKey("eventSource"));
Assert.assertTrue(example1.containsKey("eventID"));
Assert.assertEquals(expectedIUT1, example1.get("iut").toString());
Assert.assertEquals(expectedEventSource1, example1.get("eventSource").toString());
Assert.assertEquals(expectedEventID1, example1.get("eventID").toString());
Assert.assertTrue(structured.containsKey("exampleSDID@32480"));
Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480");
Assert.assertTrue(example2.containsKey("iut"));
Assert.assertTrue(example2.containsKey("eventSource"));
Assert.assertTrue(example2.containsKey("eventID"));
Assert.assertEquals(expectedIUT2, example2.get("iut").toString());
Assert.assertEquals(expectedEventSource2, example2.get("eventSource").toString());
Assert.assertEquals(expectedEventID2, example2.get("eventID").toString());
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLineSomeNulls() throws IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log.txt"))) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key()));
Assert.assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key()));
Assert.assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key()));
Assert.assertEquals(expectedTimestamp, (Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key()));
Assert.assertNull(record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
Assert.assertTrue(structured.containsKey("exampleSDID@32473"));
Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473");
Assert.assertTrue(example1.containsKey("iut"));
Assert.assertTrue(example1.containsKey("eventSource"));
Assert.assertTrue(example1.containsKey("eventID"));
Assert.assertEquals(expectedIUT1, example1.get("iut").toString());
Assert.assertEquals(expectedEventSource1, example1.get("eventSource").toString());
Assert.assertEquals(expectedEventID1, example1.get("eventID").toString());
Assert.assertTrue(structured.containsKey("exampleSDID@32480"));
Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480");
Assert.assertTrue(example2.containsKey("iut"));
Assert.assertTrue(example2.containsKey("eventSource"));
Assert.assertTrue(example2.containsKey("eventID"));
Assert.assertEquals(expectedIUT2, example2.get("iut").toString());
Assert.assertEquals(expectedEventSource2, example2.get("eventSource").toString());
Assert.assertEquals(expectedEventID2, example2.get("eventID").toString());
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseMultipleLine() throws IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix.txt"))) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
while (record != null){
assertNotNull(record.getValues());
count++;
record = deserializer.nextRecord();
}
Assert.assertEquals(count, 3);
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseMultipleLineWithError() throws IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) {
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
int exceptionCount = 0;
while (record != null){
assertNotNull(record.getValues());
try {
record = deserializer.nextRecord();
count++;
} catch (Exception e) {
exceptionCount++;
}
}
Assert.assertEquals(count, 3);
Assert.assertEquals(exceptionCount,1);
deserializer.close();
}
}
public void writeSchema() {
String s = Syslog5424Reader.createRecordSchema().toString();
System.out.println(s);
System.out.println(AvroTypeUtil.extractAvroSchema( Syslog5424Reader.createRecordSchema() ).toString(true));
}
}

View File

@ -0,0 +1 @@
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance

View File

@ -0,0 +1 @@
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance

View File

@ -0,0 +1,3 @@
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance
<14>1 2014-06-20T09:14:07Z loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance

View File

@ -0,0 +1,4 @@
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance
POISONPILL 30303030
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance
<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance