diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index a874632e89..dcf8b5a9ac 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -133,30 +133,19 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()); - this.recordSchema = createRecordSchema(grok); + final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); + if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) { + this.recordSchema = createRecordSchema(grok); + } else { + this.recordSchema = null; + } } static RecordSchema createRecordSchema(final Grok grok) { final List fields = new ArrayList<>(); String grokExpression = grok.getOriginalGrokPattern(); - while (grokExpression.length() > 0) { - final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); - if (matcher.find()) { - final Map namedGroups = GrokUtils.namedGroups(matcher, grokExpression); - final String fieldName = namedGroups.get("subname"); - - DataType dataType = RecordFieldType.STRING.getDataType(); - final RecordField recordField = new RecordField(fieldName, dataType); - fields.add(recordField); - - if (grokExpression.length() > matcher.end() + 1) { - grokExpression = grokExpression.substring(matcher.end() + 1); - } else { - break; - } - } - } + populateSchemaFieldNames(grok, grokExpression, fields); fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); @@ -164,6 +153,38 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac return schema; } + private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List fields) { + while (grokExpression.length() > 0) { + final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); + if (matcher.find()) { + final Map extractedGroups = GrokUtils.namedGroups(matcher, grokExpression); + final String subName = extractedGroups.get("subname"); + + if (subName == null) { + final String subPatternName = extractedGroups.get("pattern"); + if (subPatternName == null) { + continue; + } + + final String subExpression = grok.getPatterns().get(subPatternName); + populateSchemaFieldNames(grok, subExpression, fields); + } else { + DataType dataType = RecordFieldType.STRING.getDataType(); + final RecordField recordField = new RecordField(subName, dataType); + fields.add(recordField); + } + + if (grokExpression.length() > matcher.end() + 1) { + grokExpression = grokExpression.substring(matcher.end()); + } else { + break; + } + } else { + break; + } + } + } + @Override protected List getSchemaAccessStrategyValues() { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index ae5d433eff..1f9d57206e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import io.thekraken.grok.api.Grok; @@ -186,4 +189,41 @@ public class TestGrokRecordReader { } } + @Test + public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException { + final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message"; + final byte[] msgBytes = syslogMsg.getBytes(); + + try (final InputStream in = new ByteArrayInputStream(msgBytes)) { + final Grok grok = new Grok(); + grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); + grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}"); + + final RecordSchema schema = GrokReader.createRecordSchema(grok); + final List fieldNames = schema.getFieldNames(); + assertEquals(8, fieldNames.size()); + assertTrue(fieldNames.contains("timestamp")); + assertTrue(fieldNames.contains("logsource")); + assertTrue(fieldNames.contains("facility")); + assertTrue(fieldNames.contains("priority")); + assertTrue(fieldNames.contains("program")); + assertTrue(fieldNames.contains("pid")); + assertTrue(fieldNames.contains("message")); + assertTrue(fieldNames.contains("stackTrace")); // always implicitly there + + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true); + final Record record = deserializer.nextRecord(); + + assertEquals("May 22 15:58:23", record.getValue("timestamp")); + assertEquals("my-host", record.getValue("logsource")); + assertNull(record.getValue("facility")); + assertNull(record.getValue("priority")); + assertEquals("nifi", record.getValue("program")); + assertEquals("12345", record.getValue("pid")); + assertEquals("My Message", record.getValue("message")); + + assertNull(deserializer.nextRecord()); + } + } + }