diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java index 520c95fba1..be48f65904 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java @@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes; import org.apache.nifi.syslog.events.Syslog5424Event; import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; @@ -39,6 +41,8 @@ import java.util.HashMap; import java.util.Map; public class Syslog5424RecordReader implements RecordReader { + private static final Logger logger = LoggerFactory.getLogger(Syslog5424RecordReader.class); + private final BufferedReader reader; private RecordSchema schema; private final StrictSyslog5424Parser parser; @@ -53,36 +57,39 @@ public class Syslog5424RecordReader implements RecordReader { @Override public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { - String line = reader.readLine(); + String line; + while (true) { + line = reader.readLine(); - if ( line == null ) { - // a null return from readLine() signals the end of the stream - return null; - } + if (line == null) { + // a null return from readLine() signals the end of the stream + return null; + } - if (StringUtils.isBlank(line)) { - // while an empty string is an error - throw new MalformedRecordException("Encountered a blank message!"); + if (StringUtils.isBlank(line)) { + logger.debug("Encountered empty line, will skip"); + continue; + } + + break; } - final MalformedRecordException malformedRecordException; Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); if (!event.isValid()) { if (event.getException() != null) { - malformedRecordException = new MalformedRecordException( + throw new MalformedRecordException( String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+ "formats supported", line), event.getException()); } else { - malformedRecordException = new MalformedRecordException( + throw new MalformedRecordException( String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" + " formats supported", line)); } - throw malformedRecordException; } - Map modifiedMap = new HashMap<>(event.getFieldMap()); + final Map modifiedMap = new HashMap<>(event.getFieldMap()); modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key()))); if(includeRaw) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java index 43ceab92ca..e92f296444 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java @@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes; import org.apache.nifi.syslog.events.SyslogEvent; import org.apache.nifi.syslog.parsers.SyslogParser; import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; @@ -36,6 +38,8 @@ import java.util.HashMap; import java.util.Map; public class SyslogRecordReader implements RecordReader { + private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class); + private final BufferedReader reader; private RecordSchema schema; private final SyslogParser parser; @@ -50,24 +54,27 @@ public class SyslogRecordReader implements RecordReader { @Override public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { - String line = reader.readLine(); + String line; + while (true) { + line = reader.readLine(); - if (line == null) { - // a null return from readLine() signals the end of the stream - return null; + if (line == null) { + // a null return from readLine() signals the end of the stream + return null; + } + + if (StringUtils.isBlank(line)) { + logger.debug("Encountered empty line, will skip"); + continue; + } + + break; } - if (StringUtils.isBlank(line)) { - // while an empty string is an error - throw new MalformedRecordException("Encountered a blank message!"); - } - - - final MalformedRecordException malformedRecordException; - SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); + final SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); if (!event.isValid()) { - malformedRecordException = new MalformedRecordException( + final MalformedRecordException malformedRecordException = new MalformedRecordException( String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" + " formats supported", line)); throw malformedRecordException; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java index 27751ac258..1af8a96cf8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java @@ -157,18 +157,12 @@ public class TestSyslogRecordReader { Record record = deserializer.nextRecord(); int count = 0; - int exceptionCount = 0; while (record != null){ assertNotNull(record.getValues()); - try { - record = deserializer.nextRecord(); - count++; - } catch (Exception e) { - exceptionCount++; - } + record = deserializer.nextRecord(); + count++; } - assertEquals(count, 3); - assertEquals(exceptionCount,1); + assertEquals(3, count); deserializer.close(); } }