diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java index ed36c9d34f..d2abe2d972 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java @@ -21,29 +21,18 @@ import com.github.palindromicity.syslog.NilPolicy; import com.github.palindromicity.syslog.StructuredDataPolicy; import com.github.palindromicity.syslog.SyslogParserBuilder; import org.apache.nifi.syslog.events.Syslog5424Event; -import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider; import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy; import org.apache.nifi.syslog.utils.NilHandlingPolicy; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - /** * Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance. * For 5424 we use simple-syslog-5424 since it parsers out structured data. */ public class StrictSyslog5424Parser { - private Charset charset; - private com.github.palindromicity.syslog.SyslogParser parser; + private final com.github.palindromicity.syslog.SyslogParser parser; - public StrictSyslog5424Parser() { - this(StandardCharsets.UTF_8, NilHandlingPolicy.NULL, NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider()); - } - - public StrictSyslog5424Parser(final Charset charset, final NilHandlingPolicy nilPolicy, - NifiStructuredDataPolicy structuredDataPolicy, KeyProvider keyProvider) { - this.charset = charset; + public StrictSyslog5424Parser(final NilHandlingPolicy nilPolicy, + final NifiStructuredDataPolicy structuredDataPolicy, final KeyProvider keyProvider) { parser = new SyslogParserBuilder() .withNilPolicy(NilPolicy.valueOf(nilPolicy.name())) .withStructuredDataPolicy(StructuredDataPolicy.valueOf(structuredDataPolicy.name())) @@ -52,54 +41,17 @@ public class StrictSyslog5424Parser { } /** - * Parses a Syslog5424Event from a {@code ByteBuffer}. + * Parses a Syslog5424Event from a String * - * @param buffer a {@code ByteBuffer} containing a syslog message - * @return a Syslog5424Event parsed from the {@code {@code byte array}} + * @param line a {@code String} containing a syslog message + * @return a Syslog5424Event parsed from the input line */ - public Syslog5424Event parseEvent(final ByteBuffer buffer) { - return parseEvent(buffer, null); - } - - /** - * Parses a Syslog5424Event from a {@code ByteBuffer}. - * - * @param buffer a {@code ByteBuffer} containing a syslog message - * @param sender the hostname of the syslog server that sent the message - * @return a Syslog5424Event parsed from the {@code byte array} - */ - public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) { - if (buffer == null) { - return null; - } - return parseEvent(bufferToBytes(buffer), sender); - } - - /** - * Parses a Syslog5424Event from a {@code byte array}. - * - * @param bytes a {@code byte array} containing a syslog message - * @param sender the hostname of the syslog server that sent the message - * @return a Syslog5424Event parsed from the {@code byte array} - */ - public Syslog5424Event parseEvent(final byte[] bytes, final String sender) { - if (bytes == null || bytes.length == 0) { - return null; - } - - // remove trailing new line before parsing - int length = bytes.length; - if (bytes[length - 1] == '\n') { - length = length - 1; - } - - final String message = new String(bytes, 0, length, charset); - + public Syslog5424Event parseEvent(final String line) { final Syslog5424Event.Builder builder = new Syslog5424Event.Builder() - .valid(false).fullMessage(message).rawMessage(bytes).sender(sender); + .valid(false).fullMessage(line); try { - parser.parseLine(message, builder::fieldMap); + parser.parseLine(line, builder::fieldMap); builder.valid(true); } catch (Exception e) { // this is not a valid 5424 message @@ -110,21 +62,4 @@ public class StrictSyslog5424Parser { // either invalid w/original msg, or fully parsed event return builder.build(); } - - public String getCharsetName() { - return charset == null ? StandardCharsets.UTF_8.name() : charset.name(); - } - - - private byte[] bufferToBytes(ByteBuffer buffer) { - if (buffer == null) { - return null; - } - if (buffer.position() != 0) { - buffer.flip(); - } - byte bytes[] = new byte[buffer.limit()]; - buffer.get(bytes, 0, buffer.limit()); - return bytes; - } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java index cb5feb49ef..81ca409448 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java @@ -26,9 +26,6 @@ import org.apache.nifi.syslog.utils.NilHandlingPolicy; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,27 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class BaseStrictSyslog5424ParserTest { - private static final Charset CHARSET = Charset.forName("UTF-8"); - private static final String NIL_VALUE = "-"; private StrictSyslog5424Parser parser; protected abstract NilHandlingPolicy getPolicy(); - protected void validateForPolicy(String expected, Object actual) { - switch (getPolicy()) { - case DASH: - assertEquals(actual, NIL_VALUE); - break; - case OMIT: - case NULL: - assertNull(actual); - - } - } - @BeforeEach public void setup() { - parser = new StrictSyslog5424Parser(CHARSET, getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider()); + parser = new StrictSyslog5424Parser(getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider()); } @Test @@ -74,18 +57,12 @@ public abstract class BaseStrictSyslog5424ParserTest { final String appName = "su"; final String procId = "-"; final String msgId = "ID17"; - final String structuredData = "-"; final String body = "'su root' failed for lonvick on /dev/pts/8"; final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + appName + " " + procId + " " + msgId + " " + "-" + " " + body; - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertNotNull(event); assertTrue(event.isValid()); assertFalse(event.getFieldMap().isEmpty()); @@ -97,7 +74,6 @@ public abstract class BaseStrictSyslog5424ParserTest { assertEquals(stamp, fieldMap.get(SyslogAttributes.SYSLOG_TIMESTAMP.key())); assertEquals(host, fieldMap.get(SyslogAttributes.SYSLOG_HOSTNAME.key())); assertEquals(appName, fieldMap.get(Syslog5424Attributes.SYSLOG_APP_NAME.key())); - validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.SYSLOG_PROCID.key())); assertEquals(msgId, fieldMap.get(Syslog5424Attributes.SYSLOG_MESSAGEID.key())); Pattern structuredPattern = new SyslogPrefixedKeyProvider().getStructuredElementIdParamNamePattern(); @@ -121,18 +97,12 @@ public abstract class BaseStrictSyslog5424ParserTest { final String appName = "su"; final String procId = "-"; final String msgId = "ID17"; - final String structuredData = "-"; final String body = "'su root' failed for lonvick on /dev/pts/8"; final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + appName + " " + procId + " " + msgId + " " + "-" + " " + body; - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertFalse(event.isValid()); } @@ -141,12 +111,7 @@ public abstract class BaseStrictSyslog5424ParserTest { final String message = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + "ID47 - 'su root' failed for lonvick on /dev/pts/8\n"; - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertNotNull(event); assertTrue(event.isValid()); } @@ -166,12 +131,7 @@ public abstract class BaseStrictSyslog5424ParserTest { + "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"); for (final String message : messages) { - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertTrue(event.isValid()); } } @@ -190,54 +150,22 @@ public abstract class BaseStrictSyslog5424ParserTest { + "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"]"); for (final String message : messages) { - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertTrue(event.isValid()); assertNull(event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key())); } - - } @Test public void testInvalidPriority() { final String message = "10 Oct 13 14:14:43 localhost some body of the message"; - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertNotNull(event); assertFalse(event.isValid()); assertEquals(message, event.getFullMessage()); } - @Test - public void testParseWithSender() { - final String sender = "127.0.0.1"; - final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator" - + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01" - + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" - + "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"; - - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - buffer.clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer, sender); - assertNotNull(event); - assertTrue(event.isValid()); - assertEquals(sender, event.getSender()); - assertEquals("Removing instance", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key())); - } - @Test public void testParseWithBOM() { final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator" @@ -245,12 +173,7 @@ public abstract class BaseStrictSyslog5424ParserTest { + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" + "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] \uFEFFMessage with some Umlauts äöü"; - final byte[] bytes = message.getBytes(CHARSET); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - ((Buffer)buffer).clear(); - buffer.put(bytes); - - final Syslog5424Event event = parser.parseEvent(buffer); + final Syslog5424Event event = parser.parseEvent(message); assertNotNull(event); assertTrue(event.isValid()); assertEquals("Message with some Umlauts äöü", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key())); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java index 08f7c2670e..174d78cd03 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java @@ -36,7 +36,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider; @@ -46,8 +45,6 @@ import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; import org.apache.nifi.syslog.events.Syslog5424Event; import org.apache.nifi.syslog.attributes.SyslogAttributes; -import java.io.IOException; -import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Set; - @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -128,6 +124,7 @@ public class ParseSyslog5424 extends AbstractProcessor { private volatile StrictSyslog5424Parser parser; + private volatile Charset charset; @Override protected List getSupportedPropertyDescriptors() { @@ -148,11 +145,9 @@ public class ParseSyslog5424 extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - final String charsetName = context.getProperty(CHARSET).getValue(); + charset = Charset.forName(context.getProperty(CHARSET).getValue()); final String nilPolicyString = context.getProperty(NIL_POLICY).getValue(); - parser = new StrictSyslog5424Parser(Charset.forName(charsetName), - NilHandlingPolicy.valueOf(nilPolicyString), - NifiStructuredDataPolicy.FLATTEN,new SyslogPrefixedKeyProvider()); + parser = new StrictSyslog5424Parser(NilHandlingPolicy.valueOf(nilPolicyString), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider()); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -168,24 +163,20 @@ public class ParseSyslog5424 extends AbstractProcessor { } final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); + final String line = new String(buffer, charset).trim(); final Syslog5424Event syslogEvent; try { - syslogEvent = parser.parseEvent(buffer, null); + syslogEvent = parser.parseEvent(line); } catch (final ProcessException pe) { - getLogger().error("Failed to parse {} as a Syslog 5424 message due to {}; routing to failure", new Object[] {flowFile, pe}); + getLogger().error("Failed to parse {} as a Syslog 5424 message; routing to failure", flowFile, pe); session.transfer(flowFile, REL_FAILURE); return; } if (syslogEvent == null || !syslogEvent.isValid()) { - getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile}); + getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", flowFile); session.transfer(flowFile, REL_FAILURE); return; } @@ -197,7 +188,6 @@ public class ParseSyslog5424 extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); } - private static Map convertMap(Map map) { Map returnMap = new HashMap<>(); map.forEach((key,value) -> returnMap.put(key,(String)value)); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java index 0f988e9510..eab295b9cc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java @@ -82,8 +82,9 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea .allowableValues("true", "false") .build(); + private volatile Charset charset; private volatile StrictSyslog5424Parser parser; - private volatile static boolean includeRaw; + private volatile boolean includeRaw; private volatile RecordSchema recordSchema; @Override @@ -97,9 +98,9 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea @OnEnabled public void onEnabled(final ConfigurationContext context) { - final String charsetName = context.getProperty(CHARSET).getValue(); + charset = Charset.forName(context.getProperty(CHARSET).getValue()); includeRaw = context.getProperty(ADD_RAW).asBoolean(); - parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); + parser = new StrictSyslog5424Parser(NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); recordSchema = createRecordSchema(); } @@ -120,7 +121,7 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea return createAccessStrategy(); } - static RecordSchema createRecordSchema() { + RecordSchema createRecordSchema() { final List fields = new ArrayList<>(); fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true)); fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true)); @@ -135,13 +136,12 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(), RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())))); - if(includeRaw) { + if (includeRaw) { fields.add(new RecordField(RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true)); } SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build(); - final RecordSchema schema = new SimpleRecordSchema(fields,schemaIdentifier); - return schema; + return new SimpleRecordSchema(fields,schemaIdentifier); } private SchemaAccessStrategy createAccessStrategy() { @@ -164,6 +164,6 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea @Override public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); - return new Syslog5424RecordReader(parser, includeRaw, in, schema); + return new Syslog5424RecordReader(parser, includeRaw, charset, in, schema); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java index be48f65904..28bc1d83ed 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java @@ -26,14 +26,12 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes; import org.apache.nifi.syslog.events.Syslog5424Event; import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; import org.apache.nifi.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.sql.Timestamp; import java.time.Instant; import java.time.format.DateTimeFormatter; @@ -41,15 +39,13 @@ import java.util.HashMap; import java.util.Map; public class Syslog5424RecordReader implements RecordReader { - private static final Logger logger = LoggerFactory.getLogger(Syslog5424RecordReader.class); - private final BufferedReader reader; - private RecordSchema schema; + private final RecordSchema schema; private final StrictSyslog5424Parser parser; private final boolean includeRaw; - public Syslog5424RecordReader(StrictSyslog5424Parser parser, boolean includeRaw, InputStream in, RecordSchema schema){ - this.reader = new BufferedReader(new InputStreamReader(in)); + public Syslog5424RecordReader(final StrictSyslog5424Parser parser, final boolean includeRaw, final Charset charset, final InputStream in, final RecordSchema schema) { + this.reader = new BufferedReader(new InputStreamReader(in, charset)); this.schema = schema; this.parser = parser; this.includeRaw = includeRaw; @@ -67,15 +63,13 @@ public class Syslog5424RecordReader implements RecordReader { } if (StringUtils.isBlank(line)) { - logger.debug("Encountered empty line, will skip"); continue; } break; } - - Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); + final Syslog5424Event event = parser.parseEvent(line); if (!event.isValid()) { if (event.getException() != null) { @@ -92,7 +86,7 @@ public class Syslog5424RecordReader implements RecordReader { final Map modifiedMap = new HashMap<>(event.getFieldMap()); modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key()))); - if(includeRaw) { + if (includeRaw) { modifiedMap.put(Syslog5424Reader.RAW_MESSAGE_NAME, line); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java index 2454f410a1..b9703c66a5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java @@ -17,7 +17,6 @@ package org.apache.nifi.syslog; -import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.syslog.attributes.Syslog5424Attributes; @@ -28,11 +27,11 @@ import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy; import org.apache.nifi.syslog.utils.NilHandlingPolicy; import org.junit.jupiter.api.Test; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.time.format.DateTimeFormatter; @@ -44,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestSyslog5424RecordReader { - private static final Charset CHARSET = Charset.forName("UTF-8"); + private static final Charset CHARSET = StandardCharsets.UTF_8; private static final String expectedVersion = "1"; private static final String expectedMessage = "Removing instance"; private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38"; @@ -70,12 +69,14 @@ public class TestSyslog5424RecordReader { @Test @SuppressWarnings("unchecked") public void testParseSingleLine() throws IOException, MalformedRecordException { - try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_all.txt"))) { - StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + final Syslog5424Reader reader = new Syslog5424Reader(); + + try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_all.txt")) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser( NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); - final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, true, fis, Syslog5424Reader.createRecordSchema()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, true, CHARSET, fis, reader.createRecordSchema()); final Record record = deserializer.nextRecord(); assertNotNull(record.getValues()); @@ -122,12 +123,14 @@ public class TestSyslog5424RecordReader { @Test @SuppressWarnings("unchecked") public void testParseSingleLineSomeNulls() throws IOException, MalformedRecordException { - try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log.txt"))) { - StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + final Syslog5424Reader reader = new Syslog5424Reader(); + + try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log.txt")) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser( NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); - final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema()); final Record record = deserializer.nextRecord(); assertNotNull(record.getValues()); @@ -173,12 +176,14 @@ public class TestSyslog5424RecordReader { @Test public void testParseMultipleLine() throws IOException, MalformedRecordException { - try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix.txt"))) { - StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + final Syslog5424Reader reader = new Syslog5424Reader(); + + try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix.txt")) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser( NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); - final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema()); Record record = deserializer.nextRecord(); int count = 0; @@ -194,12 +199,14 @@ public class TestSyslog5424RecordReader { @Test public void testParseMultipleLineWithError() throws IOException, MalformedRecordException { - try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) { - StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + final Syslog5424Reader reader = new Syslog5424Reader(); + + try (final InputStream fis = new FileInputStream("src/test/resources/syslog/syslog5424/log_mix_in_error.txt")) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser( NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); - final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, fis, Syslog5424Reader.createRecordSchema()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, false, CHARSET, fis, reader.createRecordSchema()); Record record = deserializer.nextRecord(); int count = 0; @@ -218,11 +225,4 @@ public class TestSyslog5424RecordReader { deserializer.close(); } } - - public void writeSchema() { - String s = Syslog5424Reader.createRecordSchema().toString(); - System.out.println(s); - System.out.println(AvroTypeUtil.extractAvroSchema( Syslog5424Reader.createRecordSchema() ).toString(true)); - } - }