NIFI-4102: If first line read does not match Grok expression, skip line

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1932
This commit is contained in:
Mark Payne 2017-06-21 14:39:11 -04:00 committed by Matt Burgess
parent ba3372a1dc
commit 73e601bc55
2 changed files with 82 additions and 14 deletions

View File

@ -21,7 +21,6 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -70,19 +69,17 @@ public class GrokRecordReader implements RecordReader {
@Override @Override
public Record nextRecord() throws IOException, MalformedRecordException { public Record nextRecord() throws IOException, MalformedRecordException {
final String line = nextLine == null ? reader.readLine() : nextLine; Map<String, Object> valueMap = null;
nextLine = null; // ensure that we don't process nextLine again while (valueMap == null || valueMap.isEmpty()) {
if (line == null) { final String line = nextLine == null ? reader.readLine() : nextLine;
return null; nextLine = null; // ensure that we don't process nextLine again
} if (line == null) {
return null;
}
final RecordSchema schema = getSchema(); final Match match = grok.match(line);
match.captures();
final Match match = grok.match(line); valueMap = match.toMap();
match.captures();
final Map<String, Object> valueMap = match.toMap();
if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array.
return new MapRecord(schema, Collections.emptyMap());
} }
// Read the next line to see if it matches the pattern (in which case we will simply leave it for // Read the next line to see if it matches the pattern (in which case we will simply leave it for
@ -149,7 +146,7 @@ public class GrokRecordReader implements RecordReader {
return new MapRecord(schema, values); return new MapRecord(schema, values);
} catch (final Exception e) { } catch (final Exception e) {
throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + line, e); throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + nextLine, e);
} }
} }

View File

@ -226,4 +226,75 @@ public class TestGrokRecordReader {
} }
} }
@Test
public void testSkipUnmatchedRecordFirstLine() throws GrokException, IOException, MalformedRecordException {
final String nonMatchingRecord = "hello there";
final String matchingRecord = "1 2 3 4 5";
final String input = nonMatchingRecord + "\n" + matchingRecord;
final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(6, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false);
final Record record = deserializer.nextRecord();
assertEquals("1", record.getValue("first"));
assertEquals("2", record.getValue("second"));
assertEquals("3", record.getValue("third"));
assertEquals("4", record.getValue("fourth"));
assertEquals("5", record.getValue("fifth"));
assertNull(deserializer.nextRecord());
}
}
@Test
public void testSkipUnmatchedRecordMiddle() throws GrokException, IOException, MalformedRecordException {
final String nonMatchingRecord = "hello there";
final String matchingRecord = "1 2 3 4 5";
final String input = matchingRecord + "\n" + nonMatchingRecord + "\n" + matchingRecord;
final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(6, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false);
for (int i = 0; i < 2; i++) {
final Record record = deserializer.nextRecord();
assertEquals("1", record.getValue("first"));
assertEquals("2", record.getValue("second"));
assertEquals("3", record.getValue("third"));
assertEquals("4", record.getValue("fourth"));
assertEquals("5", record.getValue("fifth"));
}
assertNull(deserializer.nextRecord());
}
}
} }