NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when determining the schema

This closes #1839.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-05-22 16:04:34 -04:00 committed by Bryan Bende
parent 6937a6cf64
commit a1b07b1e9c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 79 additions and 18 deletions

View File

@ -133,30 +133,19 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
this.recordSchema = createRecordSchema(grok);
} else {
this.recordSchema = null;
}
}
static RecordSchema createRecordSchema(final Grok grok) {
final List<RecordField> fields = new ArrayList<>();
String grokExpression = grok.getOriginalGrokPattern();
while (grokExpression.length() > 0) {
final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
if (matcher.find()) {
final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression);
final String fieldName = namedGroups.get("subname");
DataType dataType = RecordFieldType.STRING.getDataType();
final RecordField recordField = new RecordField(fieldName, dataType);
fields.add(recordField);
if (grokExpression.length() > matcher.end() + 1) {
grokExpression = grokExpression.substring(matcher.end() + 1);
} else {
break;
}
}
}
populateSchemaFieldNames(grok, grokExpression, fields);
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
@ -164,6 +153,38 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
return schema;
}
private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) {
while (grokExpression.length() > 0) {
final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
if (matcher.find()) {
final Map<String, String> extractedGroups = GrokUtils.namedGroups(matcher, grokExpression);
final String subName = extractedGroups.get("subname");
if (subName == null) {
final String subPatternName = extractedGroups.get("pattern");
if (subPatternName == null) {
continue;
}
final String subExpression = grok.getPatterns().get(subPatternName);
populateSchemaFieldNames(grok, subExpression, fields);
} else {
DataType dataType = RecordFieldType.STRING.getDataType();
final RecordField recordField = new RecordField(subName, dataType);
fields.add(recordField);
}
if (grokExpression.length() > matcher.end() + 1) {
grokExpression = grokExpression.substring(matcher.end());
} else {
break;
}
} else {
break;
}
}
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {

View File

@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
import io.thekraken.grok.api.Grok;
@ -186,4 +189,41 @@ public class TestGrokRecordReader {
}
}
@Test
public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException {
final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message";
final byte[] msgBytes = syslogMsg.getBytes();
try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(8, fieldNames.size());
assertTrue(fieldNames.contains("timestamp"));
assertTrue(fieldNames.contains("logsource"));
assertTrue(fieldNames.contains("facility"));
assertTrue(fieldNames.contains("priority"));
assertTrue(fieldNames.contains("program"));
assertTrue(fieldNames.contains("pid"));
assertTrue(fieldNames.contains("message"));
assertTrue(fieldNames.contains("stackTrace")); // always implicitly there
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true);
final Record record = deserializer.nextRecord();
assertEquals("May 22 15:58:23", record.getValue("timestamp"));
assertEquals("my-host", record.getValue("logsource"));
assertNull(record.getValue("facility"));
assertNull(record.getValue("priority"));
assertEquals("nifi", record.getValue("program"));
assertEquals("12345", record.getValue("pid"));
assertEquals("My Message", record.getValue("message"));
assertNull(deserializer.nextRecord());
}
}
}