NIFI-5432 Add Syslog Record Reader legacy Syslog

- Add additional details for schema

This closes #2900.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Otto Fowler 2018-07-16 17:58:13 -04:00 committed by Bryan Bende
parent b191f6a62a
commit 0a493bf7fd
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 517 additions and 0 deletions

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.parsers.SyslogParser;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"syslog", "logs", "logfiles", "parse", "text", "record", "reader"})
@CapabilityDescription("Attempts to parses the contents of a Syslog message in accordance to RFC5424 and RFC3164. In the " +
"case of RFC5424 formatted messages, structured data is not supported, and will be returned as part of the message." +
"Note: Be mindfull that RFC3164 is informational and a wide range of different implementations are present in" +
" the wild.")
public class SyslogReader extends SchemaRegistryService implements RecordReaderFactory {
public static final String GENERIC_SYSLOG_SCHEMA_NAME = "default-syslog-schema";
static final AllowableValue GENERIC_SYSLOG_SCHEMA = new AllowableValue(GENERIC_SYSLOG_SCHEMA_NAME, "Use Generic Syslog Schema",
"The schema will be the default Syslog schema.");
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies which character set of the Syslog messages")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
private volatile SyslogParser parser;
private volatile RecordSchema recordSchema;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(1);
properties.add(CHARSET);
return properties;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String charsetName = context.getProperty(CHARSET).getValue();
parser = new SyslogParser(Charset.forName(charsetName));
recordSchema = createRecordSchema();
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>();
allowableValues.add(GENERIC_SYSLOG_SCHEMA);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return GENERIC_SYSLOG_SCHEMA;
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
return createAccessStrategy();
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
return createAccessStrategy();
}
static RecordSchema createRecordSchema() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true));
SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder().name(GENERIC_SYSLOG_SCHEMA_NAME).build();
final RecordSchema schema = new SimpleRecordSchema(fields,schemaIdentifier);
return schema;
}
private SchemaAccessStrategy createAccessStrategy() {
return new SchemaAccessStrategy() {
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
return recordSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
};
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new SyslogRecordReader(parser, in, schema);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class SyslogRecordReader implements RecordReader {
private final BufferedReader reader;
private RecordSchema schema;
private final SyslogParser parser;
public SyslogRecordReader(SyslogParser parser, InputStream in, RecordSchema schema) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.schema = schema;
this.parser = parser;
}
@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
String line = reader.readLine();
if (line == null) {
// a null return from readLine() signals the end of the stream
return null;
}
if (StringUtils.isBlank(line)) {
// while an empty string is an error
throw new MalformedRecordException("Encountered a blank message!");
}
final MalformedRecordException malformedRecordException;
SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
if (!event.isValid()) {
malformedRecordException = new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
" formats supported", line));
throw malformedRecordException;
}
final Map<String, Object> syslogMap = new HashMap<>(8);
syslogMap.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
syslogMap.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
syslogMap.put(SyslogAttributes.FACILITY.key(), event.getFacility());
syslogMap.put(SyslogAttributes.VERSION.key(), event.getVersion());
syslogMap.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
syslogMap.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
syslogMap.put(SyslogAttributes.BODY.key(), event.getMsgBody());
return new MapRecord(schema, syslogMap);
}
@Override
public RecordSchema getSchema() throws MalformedRecordException {
return schema;
}
@Override
public void close() throws IOException {
this.reader.close();
}
}

View File

@ -25,6 +25,7 @@ org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.grok.GrokReader
org.apache.nifi.syslog.SyslogReader
org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.xml.XMLReader

View File

@ -0,0 +1,70 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Syslog5424Reader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<p>
The SyslogReader Controller Service provides a means to parse the contents of a Syslog message in accordance to RFC5424 and RFC3164
formats. This reader produces records with a set schema to match the common set of fields between the specifications.
</p>
<p>
The Required Property of this service is named <code>Character Set</code> and specifies the Character Set of the incoming text.
</p>
<h2>Schemas</h2>
<p>
When a record is parsed from incoming data, it is parsed into the Generic Syslog Schema.
<h4>The Generic Syslog Schema</h4>
<code><pre>
{
"type" : "record",
"name" : "nifiRecord",
"namespace" : "org.apache.nifi",
"fields" : [ {
"name" : "priority",
"type" : [ "null", "string" ]
}, {
"name" : "severity",
"type" : [ "null", "string" ]
}, {
"name" : "facility",
"type" : [ "null", "string" ]
}, {
"name" : "version",
"type" : [ "null", "string" ]
}, {
"name" : "timestamp",
"type" : [ "null", "string" ]
}, {
"name" : "hostname",
"type" : [ "null", "string" ]
}, {
"name" : "body",
"type" : [ "null", "string" ]
} ]
}
</pre></code>
</p>
</body>
</html>

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.syslog;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestSyslogRecordReader {
private static final Charset CHARSET = Charset.forName("UTF-8");
private static final String PRI = "34";
private static final String SEV = "2";
private static final String FAC = "4";
private static final String TIME = "Oct 13 15:43:23";
private static final String HOST = "localhost.home";
private static final String IPV6SRC = "fe80::216:3300:eeaa:eeaa";
private static final String IPV4SRC = "8.8.4.4";
private static final String BODY = "some message";
static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
static final String VALID_MESSAGE_RFC3164_1 = "<" + PRI + ">" + TIME + " " + IPV6SRC + " " + BODY + "\n";
static final String VALID_MESSAGE_RFC3164_2 = "<" + PRI + ">" + TIME + " " + IPV4SRC + " " + BODY + "\n";
private static final String expectedVersion = "1";
private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38";
private static final String expectedHostName = "loggregator";
private static final String expectedProcId = "DEA";
private static final String expectedMessageId = "MSG-01";
private static final String expectedMessage = expectedAppName +
" " + expectedProcId +
" " + expectedMessageId +
" " + "[exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] [exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"]" +
" " + "Removing instance";
private static final String expectedPri = "14";
private static final String expectedTimestamp = "2014-06-20T09:14:07+00:00";
private static final String expectedFacility = "1";
private static final String expectedSeverity = "6";
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLine() throws IOException, MalformedRecordException {
try (final InputStream fis = new ByteArrayInputStream(VALID_MESSAGE_RFC3164_0.getBytes(CHARSET))){
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(BODY, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(HOST, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(PRI, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(FAC, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(TIME, record.getAsString(SyslogAttributes.TIMESTAMP.key()));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLineIPV6() throws IOException, MalformedRecordException {
try (final InputStream fis = new ByteArrayInputStream(VALID_MESSAGE_RFC3164_1.getBytes(CHARSET))){
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(BODY, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(IPV6SRC, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(PRI, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(FAC, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(TIME, record.getAsString(SyslogAttributes.TIMESTAMP.key()));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLineIPV4() throws IOException, MalformedRecordException {
try (final InputStream fis = new ByteArrayInputStream(VALID_MESSAGE_RFC3164_2.getBytes(CHARSET))){
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(BODY, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(IPV4SRC, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(PRI, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(FAC, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(TIME, record.getAsString(SyslogAttributes.TIMESTAMP.key()));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseMultipleLine() throws IOException, MalformedRecordException {
try (final InputStream fis = new ByteArrayInputStream((VALID_MESSAGE_RFC3164_0 + VALID_MESSAGE_RFC3164_1 + VALID_MESSAGE_RFC3164_2).getBytes(CHARSET))) {
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
while (record != null){
assertNotNull(record.getValues());
count++;
record = deserializer.nextRecord();
}
Assert.assertEquals(count, 3);
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseMultipleLineWithError() throws IOException, MalformedRecordException {
try (final InputStream fis = new ByteArrayInputStream((VALID_MESSAGE_RFC3164_0 + "\n" + VALID_MESSAGE_RFC3164_1 + VALID_MESSAGE_RFC3164_2).getBytes(CHARSET))) {
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
int exceptionCount = 0;
while (record != null){
assertNotNull(record.getValues());
try {
record = deserializer.nextRecord();
count++;
} catch (Exception e) {
exceptionCount++;
}
}
Assert.assertEquals(count, 3);
Assert.assertEquals(exceptionCount,1);
deserializer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLine5424() throws IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_all.txt"))) {
SyslogParser parser = new SyslogParser(CHARSET);
final SyslogRecordReader deserializer = new SyslogRecordReader(parser, fis, SyslogReader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
Assert.assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key()));
Assert.assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key()));
Assert.assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key()));
Assert.assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key()));
Assert.assertEquals(expectedTimestamp, record.getAsString(SyslogAttributes.TIMESTAMP.key()));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
public void writeSchema() {
String s = SyslogReader.createRecordSchema().toString();
System.out.println(s);
System.out.println(AvroTypeUtil.extractAvroSchema( SyslogReader.createRecordSchema() ).toString(true));
}
}