NIFI-10767: When an empty line is encountered with Syslog Readers, just skip over the empty line instead of throwing an Exception

This closes #6623

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-11-04 15:31:50 -04:00 committed by exceptionfactory
parent 61e1a37d2e
commit 381e0f84e1
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 43 additions and 35 deletions

View File

@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.Syslog5424Event; import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
@ -39,6 +41,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class Syslog5424RecordReader implements RecordReader { public class Syslog5424RecordReader implements RecordReader {
private static final Logger logger = LoggerFactory.getLogger(Syslog5424RecordReader.class);
private final BufferedReader reader; private final BufferedReader reader;
private RecordSchema schema; private RecordSchema schema;
private final StrictSyslog5424Parser parser; private final StrictSyslog5424Parser parser;
@ -53,36 +57,39 @@ public class Syslog5424RecordReader implements RecordReader {
@Override @Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
String line = reader.readLine(); String line;
while (true) {
line = reader.readLine();
if ( line == null ) { if (line == null) {
// a null return from readLine() signals the end of the stream // a null return from readLine() signals the end of the stream
return null; return null;
} }
if (StringUtils.isBlank(line)) { if (StringUtils.isBlank(line)) {
// while an empty string is an error logger.debug("Encountered empty line, will skip");
throw new MalformedRecordException("Encountered a blank message!"); continue;
}
break;
} }
final MalformedRecordException malformedRecordException;
Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
if (!event.isValid()) { if (!event.isValid()) {
if (event.getException() != null) { if (event.getException() != null) {
malformedRecordException = new MalformedRecordException( throw new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+ String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+
"formats supported", line), event.getException()); "formats supported", line), event.getException());
} else { } else {
malformedRecordException = new MalformedRecordException( throw new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" + String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
" formats supported", line)); " formats supported", line));
} }
throw malformedRecordException;
} }
Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap()); final Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key()))); modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
if(includeRaw) { if(includeRaw) {

View File

@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent; import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser; import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
@ -36,6 +38,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class SyslogRecordReader implements RecordReader { public class SyslogRecordReader implements RecordReader {
private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class);
private final BufferedReader reader; private final BufferedReader reader;
private RecordSchema schema; private RecordSchema schema;
private final SyslogParser parser; private final SyslogParser parser;
@ -50,24 +54,27 @@ public class SyslogRecordReader implements RecordReader {
@Override @Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
String line = reader.readLine(); String line;
while (true) {
line = reader.readLine();
if (line == null) { if (line == null) {
// a null return from readLine() signals the end of the stream // a null return from readLine() signals the end of the stream
return null; return null;
}
if (StringUtils.isBlank(line)) {
logger.debug("Encountered empty line, will skip");
continue;
}
break;
} }
if (StringUtils.isBlank(line)) { final SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
// while an empty string is an error
throw new MalformedRecordException("Encountered a blank message!");
}
final MalformedRecordException malformedRecordException;
SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
if (!event.isValid()) { if (!event.isValid()) {
malformedRecordException = new MalformedRecordException( final MalformedRecordException malformedRecordException = new MalformedRecordException(
String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" + String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
" formats supported", line)); " formats supported", line));
throw malformedRecordException; throw malformedRecordException;

View File

@ -157,18 +157,12 @@ public class TestSyslogRecordReader {
Record record = deserializer.nextRecord(); Record record = deserializer.nextRecord();
int count = 0; int count = 0;
int exceptionCount = 0;
while (record != null){ while (record != null){
assertNotNull(record.getValues()); assertNotNull(record.getValues());
try { record = deserializer.nextRecord();
record = deserializer.nextRecord(); count++;
count++;
} catch (Exception e) {
exceptionCount++;
}
} }
assertEquals(count, 3); assertEquals(3, count);
assertEquals(exceptionCount,1);
deserializer.close(); deserializer.close();
} }
} }