mirror of https://github.com/apache/nifi.git
NIFI-12163 This closes #7835. Improved Syslog 5424 Line Handling
- Eliminated unused parseEvent method signatures from StrictSyslog5424Parser in favor of a single String line method - Eliminated intermediate conversion from String to byte array and back to String for Syslog Parser Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
parent
6394912cce
commit
da4c6f6e25
|
@ -21,29 +21,18 @@ import com.github.palindromicity.syslog.NilPolicy;
|
|||
import com.github.palindromicity.syslog.StructuredDataPolicy;
|
||||
import com.github.palindromicity.syslog.SyslogParserBuilder;
|
||||
import org.apache.nifi.syslog.events.Syslog5424Event;
|
||||
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
|
||||
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
|
||||
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance.
|
||||
* For 5424 we use simple-syslog-5424 since it parsers out structured data.
|
||||
*/
|
||||
public class StrictSyslog5424Parser {
|
||||
private Charset charset;
|
||||
private com.github.palindromicity.syslog.SyslogParser parser;
|
||||
private final com.github.palindromicity.syslog.SyslogParser parser;
|
||||
|
||||
public StrictSyslog5424Parser() {
|
||||
this(StandardCharsets.UTF_8, NilHandlingPolicy.NULL, NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
|
||||
}
|
||||
|
||||
public StrictSyslog5424Parser(final Charset charset, final NilHandlingPolicy nilPolicy,
|
||||
NifiStructuredDataPolicy structuredDataPolicy, KeyProvider keyProvider) {
|
||||
this.charset = charset;
|
||||
public StrictSyslog5424Parser(final NilHandlingPolicy nilPolicy,
|
||||
final NifiStructuredDataPolicy structuredDataPolicy, final KeyProvider keyProvider) {
|
||||
parser = new SyslogParserBuilder()
|
||||
.withNilPolicy(NilPolicy.valueOf(nilPolicy.name()))
|
||||
.withStructuredDataPolicy(StructuredDataPolicy.valueOf(structuredDataPolicy.name()))
|
||||
|
@ -52,54 +41,17 @@ public class StrictSyslog5424Parser {
|
|||
}
|
||||
|
||||
/**
|
||||
* Parses a Syslog5424Event from a {@code ByteBuffer}.
|
||||
* Parses a Syslog5424Event from a String
|
||||
*
|
||||
* @param buffer a {@code ByteBuffer} containing a syslog message
|
||||
* @return a Syslog5424Event parsed from the {@code {@code byte array}}
|
||||
* @param line a {@code String} containing a syslog message
|
||||
* @return a Syslog5424Event parsed from the input line
|
||||
*/
|
||||
public Syslog5424Event parseEvent(final ByteBuffer buffer) {
|
||||
return parseEvent(buffer, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a Syslog5424Event from a {@code ByteBuffer}.
|
||||
*
|
||||
* @param buffer a {@code ByteBuffer} containing a syslog message
|
||||
* @param sender the hostname of the syslog server that sent the message
|
||||
* @return a Syslog5424Event parsed from the {@code byte array}
|
||||
*/
|
||||
public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) {
|
||||
if (buffer == null) {
|
||||
return null;
|
||||
}
|
||||
return parseEvent(bufferToBytes(buffer), sender);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a Syslog5424Event from a {@code byte array}.
|
||||
*
|
||||
* @param bytes a {@code byte array} containing a syslog message
|
||||
* @param sender the hostname of the syslog server that sent the message
|
||||
* @return a Syslog5424Event parsed from the {@code byte array}
|
||||
*/
|
||||
public Syslog5424Event parseEvent(final byte[] bytes, final String sender) {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// remove trailing new line before parsing
|
||||
int length = bytes.length;
|
||||
if (bytes[length - 1] == '\n') {
|
||||
length = length - 1;
|
||||
}
|
||||
|
||||
final String message = new String(bytes, 0, length, charset);
|
||||
|
||||
public Syslog5424Event parseEvent(final String line) {
|
||||
final Syslog5424Event.Builder builder = new Syslog5424Event.Builder()
|
||||
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
|
||||
.valid(false).fullMessage(line);
|
||||
|
||||
try {
|
||||
parser.parseLine(message, builder::fieldMap);
|
||||
parser.parseLine(line, builder::fieldMap);
|
||||
builder.valid(true);
|
||||
} catch (Exception e) {
|
||||
// this is not a valid 5424 message
|
||||
|
@ -110,21 +62,4 @@ public class StrictSyslog5424Parser {
|
|||
// either invalid w/original msg, or fully parsed event
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public String getCharsetName() {
|
||||
return charset == null ? StandardCharsets.UTF_8.name() : charset.name();
|
||||
}
|
||||
|
||||
|
||||
private byte[] bufferToBytes(ByteBuffer buffer) {
|
||||
if (buffer == null) {
|
||||
return null;
|
||||
}
|
||||
if (buffer.position() != 0) {
|
||||
buffer.flip();
|
||||
}
|
||||
byte bytes[] = new byte[buffer.limit()];
|
||||
buffer.get(bytes, 0, buffer.limit());
|
||||
return bytes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,9 +26,6 @@ import org.apache.nifi.syslog.utils.NilHandlingPolicy;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.nio.Buffer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -42,27 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
public abstract class BaseStrictSyslog5424ParserTest {
|
||||
|
||||
private static final Charset CHARSET = Charset.forName("UTF-8");
|
||||
private static final String NIL_VALUE = "-";
|
||||
private StrictSyslog5424Parser parser;
|
||||
|
||||
protected abstract NilHandlingPolicy getPolicy();
|
||||
|
||||
protected void validateForPolicy(String expected, Object actual) {
|
||||
switch (getPolicy()) {
|
||||
case DASH:
|
||||
assertEquals(actual, NIL_VALUE);
|
||||
break;
|
||||
case OMIT:
|
||||
case NULL:
|
||||
assertNull(actual);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
parser = new StrictSyslog5424Parser(CHARSET, getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
|
||||
parser = new StrictSyslog5424Parser(getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -74,18 +57,12 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
final String appName = "su";
|
||||
final String procId = "-";
|
||||
final String msgId = "ID17";
|
||||
final String structuredData = "-";
|
||||
final String body = "'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " "
|
||||
+ appName + " " + procId + " " + msgId + " " + "-" + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertNotNull(event);
|
||||
assertTrue(event.isValid());
|
||||
assertFalse(event.getFieldMap().isEmpty());
|
||||
|
@ -97,7 +74,6 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
assertEquals(stamp, fieldMap.get(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
|
||||
assertEquals(host, fieldMap.get(SyslogAttributes.SYSLOG_HOSTNAME.key()));
|
||||
assertEquals(appName, fieldMap.get(Syslog5424Attributes.SYSLOG_APP_NAME.key()));
|
||||
validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.SYSLOG_PROCID.key()));
|
||||
assertEquals(msgId, fieldMap.get(Syslog5424Attributes.SYSLOG_MESSAGEID.key()));
|
||||
|
||||
Pattern structuredPattern = new SyslogPrefixedKeyProvider().getStructuredElementIdParamNamePattern();
|
||||
|
@ -121,18 +97,12 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
final String appName = "su";
|
||||
final String procId = "-";
|
||||
final String msgId = "ID17";
|
||||
final String structuredData = "-";
|
||||
final String body = "'su root' failed for lonvick on /dev/pts/8";
|
||||
|
||||
final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " "
|
||||
+ appName + " " + procId + " " + msgId + " " + "-" + " " + body;
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertFalse(event.isValid());
|
||||
}
|
||||
|
||||
|
@ -141,12 +111,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
final String message = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
|
||||
"ID47 - 'su root' failed for lonvick on /dev/pts/8\n";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertNotNull(event);
|
||||
assertTrue(event.isValid());
|
||||
}
|
||||
|
@ -166,12 +131,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance");
|
||||
|
||||
for (final String message : messages) {
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertTrue(event.isValid());
|
||||
}
|
||||
}
|
||||
|
@ -190,54 +150,22 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"]");
|
||||
|
||||
for (final String message : messages) {
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertTrue(event.isValid());
|
||||
assertNull(event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPriority() {
|
||||
final String message = "10 Oct 13 14:14:43 localhost some body of the message";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertNotNull(event);
|
||||
assertFalse(event.isValid());
|
||||
assertEquals(message, event.getFullMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseWithSender() {
|
||||
final String sender = "127.0.0.1";
|
||||
final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
|
||||
+ " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01"
|
||||
+ " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"
|
||||
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
buffer.clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer, sender);
|
||||
assertNotNull(event);
|
||||
assertTrue(event.isValid());
|
||||
assertEquals(sender, event.getSender());
|
||||
assertEquals("Removing instance", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseWithBOM() {
|
||||
final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
|
||||
|
@ -245,12 +173,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
|
|||
+ " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"
|
||||
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] \uFEFFMessage with some Umlauts äöü";
|
||||
|
||||
final byte[] bytes = message.getBytes(CHARSET);
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
|
||||
((Buffer)buffer).clear();
|
||||
buffer.put(bytes);
|
||||
|
||||
final Syslog5424Event event = parser.parseEvent(buffer);
|
||||
final Syslog5424Event event = parser.parseEvent(message);
|
||||
assertNotNull(event);
|
||||
assertTrue(event.isValid());
|
||||
assertEquals("Message with some Umlauts äöü", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
|
||||
|
@ -46,8 +45,6 @@ import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
|
|||
import org.apache.nifi.syslog.events.Syslog5424Event;
|
||||
import org.apache.nifi.syslog.attributes.SyslogAttributes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -56,7 +53,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
|
@ -128,6 +124,7 @@ public class ParseSyslog5424 extends AbstractProcessor {
|
|||
|
||||
private volatile StrictSyslog5424Parser parser;
|
||||
|
||||
private volatile Charset charset;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
@ -148,11 +145,9 @@ public class ParseSyslog5424 extends AbstractProcessor {
|
|||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
final String charsetName = context.getProperty(CHARSET).getValue();
|
||||
charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
final String nilPolicyString = context.getProperty(NIL_POLICY).getValue();
|
||||
parser = new StrictSyslog5424Parser(Charset.forName(charsetName),
|
||||
NilHandlingPolicy.valueOf(nilPolicyString),
|
||||
NifiStructuredDataPolicy.FLATTEN,new SyslogPrefixedKeyProvider());
|
||||
parser = new StrictSyslog5424Parser(NilHandlingPolicy.valueOf(nilPolicyString), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
|
||||
}
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
@ -168,24 +163,20 @@ public class ParseSyslog5424 extends AbstractProcessor {
|
|||
}
|
||||
|
||||
final byte[] buffer = new byte[(int) flowFile.getSize()];
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
StreamUtils.fillBuffer(in, buffer);
|
||||
}
|
||||
});
|
||||
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
|
||||
final String line = new String(buffer, charset).trim();
|
||||
|
||||
final Syslog5424Event syslogEvent;
|
||||
try {
|
||||
syslogEvent = parser.parseEvent(buffer, null);
|
||||
syslogEvent = parser.parseEvent(line);
|
||||
} catch (final ProcessException pe) {
|
||||
getLogger().error("Failed to parse {} as a Syslog 5424 message due to {}; routing to failure", new Object[] {flowFile, pe});
|
||||
getLogger().error("Failed to parse {} as a Syslog 5424 message; routing to failure", flowFile, pe);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
if (syslogEvent == null || !syslogEvent.isValid()) {
|
||||
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile});
|
||||
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
@ -197,7 +188,6 @@ public class ParseSyslog5424 extends AbstractProcessor {
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
|
||||
|
||||
private static Map<String,String> convertMap(Map<String, Object> map) {
|
||||
Map<String,String> returnMap = new HashMap<>();
|
||||
map.forEach((key,value) -> returnMap.put(key,(String)value));
|
||||
|
|
|
@ -82,8 +82,9 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
|
|||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
private volatile Charset charset;
|
||||
private volatile StrictSyslog5424Parser parser;
|
||||
private volatile static boolean includeRaw;
|
||||
private volatile boolean includeRaw;
|
||||
private volatile RecordSchema recordSchema;
|
||||
|
||||
@Override
|
||||
|
@ -97,9 +98,9 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
|
|||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
final String charsetName = context.getProperty(CHARSET).getValue();
|
||||
charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
includeRaw = context.getProperty(ADD_RAW).asBoolean();
|
||||
parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
|
||||
parser = new StrictSyslog5424Parser(NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
|
||||
recordSchema = createRecordSchema();
|
||||
}
|
||||
|
||||
|
@ -120,7 +121,7 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
|
|||
return createAccessStrategy();
|
||||
}
|
||||
|
||||
static RecordSchema createRecordSchema() {
|
||||
RecordSchema createRecordSchema() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true));
|
||||
fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true));
|
||||
|
@ -135,13 +136,12 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
|
|||
fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
|
||||
RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
|
||||
|
||||
if(includeRaw) {
|
||||
if (includeRaw) {
|
||||
fields.add(new RecordField(RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
|
||||
}
|
||||
|
||||
SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build();
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields,schemaIdentifier);
|
||||
return schema;
|
||||
return new SimpleRecordSchema(fields,schemaIdentifier);
|
||||
}
|
||||
|
||||
private SchemaAccessStrategy createAccessStrategy() {
|
||||
|
@ -164,6 +164,6 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
|
|||
@Override
|
||||
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
|
||||
final RecordSchema schema = getSchema(variables, in, null);
|
||||
return new Syslog5424RecordReader(parser, includeRaw, in, schema);
|
||||
return new Syslog5424RecordReader(parser, includeRaw, charset, in, schema);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,14 +26,12 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
|
|||
import org.apache.nifi.syslog.events.Syslog5424Event;
|
||||
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
@ -41,15 +39,13 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
public class Syslog5424RecordReader implements RecordReader {
|
||||
private static final Logger logger = LoggerFactory.getLogger(Syslog5424RecordReader.class);
|
||||
|
||||
private final BufferedReader reader;
|
||||
private RecordSchema schema;
|
||||
private final RecordSchema schema;
|
||||
private final StrictSyslog5424Parser parser;
|
||||
private final boolean includeRaw;
|
||||
|
||||
public Syslog5424RecordReader(StrictSyslog5424Parser parser, boolean includeRaw, InputStream in, RecordSchema schema){
|
||||
this.reader = new BufferedReader(new InputStreamReader(in));
|
||||
public Syslog5424RecordReader(final StrictSyslog5424Parser parser, final boolean includeRaw, final Charset charset, final InputStream in, final RecordSchema schema) {
|
||||
this.reader = new BufferedReader(new InputStreamReader(in, charset));
|
||||
this.schema = schema;
|
||||
this.parser = parser;
|
||||
this.includeRaw = includeRaw;
|
||||
|
@ -67,15 +63,13 @@ public class Syslog5424RecordReader implements RecordReader {
|
|||
}
|
||||
|
||||
if (StringUtils.isBlank(line)) {
|
||||
logger.debug("Encountered empty line, will skip");
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
|
||||
final Syslog5424Event event = parser.parseEvent(line);
|
||||
|
||||
if (!event.isValid()) {
|
||||
if (event.getException() != null) {
|
||||
|
@ -92,7 +86,7 @@ public class Syslog5424RecordReader implements RecordReader {
|
|||
final Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
|
||||
modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
|
||||
|
||||
if(includeRaw) {
|
||||
if (includeRaw) {
|
||||
modifiedMap.put(Syslog5424Reader.RAW_MESSAGE_NAME, line);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.nifi.syslog;
|
||||
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
|
||||
|
@ -28,11 +27,11 @@ import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
|
|||
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
@ -44,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestSyslog5424RecordReader {
|
||||
private static final Charset CHARSET = Charset.forName("UTF-8");
|
||||
private static final Charset CHARSET = StandardCharsets.UTF_8;
|
||||
private static final String expectedVersion = "1";
|
||||
private static final String expectedMessage = "Removing instance";
|
||||
private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38";
|
||||
|
@ -70,12 +69,14 @@ public class TestSyslog5424RecordReader {
|
|||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testParseSingleLine() throws IOException, MalformedRecordException {
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_all.txt"))) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
|
||||
final Syslog5424Reader reader = new Syslog5424Reader();
|
||||
|
||||
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_all.txt")) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
|
||||
NilHandlingPolicy.NULL,
|
||||
NifiStructuredDataPolicy.MAP_OF_MAPS,
|
||||
new SimpleKeyProvider());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, true, fis, Syslog5424Reader.createRecordSchema());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, true, CHARSET, fis, reader.createRecordSchema());
|
||||
|
||||
final Record record = deserializer.nextRecord();
|
||||
assertNotNull(record.getValues());
|
||||
|
@ -122,12 +123,14 @@ public class TestSyslog5424RecordReader {
|
|||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testParseSingleLineSomeNulls() throws IOException, MalformedRecordException {
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log.txt"))) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
|
||||
final Syslog5424Reader reader = new Syslog5424Reader();
|
||||
|
||||
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log.txt")) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
|
||||
NilHandlingPolicy.NULL,
|
||||
NifiStructuredDataPolicy.MAP_OF_MAPS,
|
||||
new SimpleKeyProvider());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
|
||||
|
||||
final Record record = deserializer.nextRecord();
|
||||
assertNotNull(record.getValues());
|
||||
|
@ -173,12 +176,14 @@ public class TestSyslog5424RecordReader {
|
|||
|
||||
@Test
|
||||
public void testParseMultipleLine() throws IOException, MalformedRecordException {
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix.txt"))) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
|
||||
final Syslog5424Reader reader = new Syslog5424Reader();
|
||||
|
||||
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix.txt")) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
|
||||
NilHandlingPolicy.NULL,
|
||||
NifiStructuredDataPolicy.MAP_OF_MAPS,
|
||||
new SimpleKeyProvider());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
|
||||
|
||||
Record record = deserializer.nextRecord();
|
||||
int count = 0;
|
||||
|
@ -194,12 +199,14 @@ public class TestSyslog5424RecordReader {
|
|||
|
||||
@Test
|
||||
public void testParseMultipleLineWithError() throws IOException, MalformedRecordException {
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
|
||||
final Syslog5424Reader reader = new Syslog5424Reader();
|
||||
|
||||
try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix_in_error.txt")) {
|
||||
StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
|
||||
NilHandlingPolicy.NULL,
|
||||
NifiStructuredDataPolicy.MAP_OF_MAPS,
|
||||
new SimpleKeyProvider());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema());
|
||||
final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema());
|
||||
|
||||
Record record = deserializer.nextRecord();
|
||||
int count = 0;
|
||||
|
@ -218,11 +225,4 @@ public class TestSyslog5424RecordReader {
|
|||
deserializer.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeSchema() {
|
||||
String s = Syslog5424Reader.createRecordSchema().toString();
|
||||
System.out.println(s);
|
||||
System.out.println(AvroTypeUtil.extractAvroSchema( Syslog5424Reader.createRecordSchema() ).toString(true));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue