NIFI-9203 Improve GrokReader to be able to handle complex grok expression properly.

This closes #5376.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2021-09-09 18:15:21 +02:00 committed by Peter Turcsanyi
parent 1ab47ce931
commit 229f45997d
5 changed files with 195 additions and 8 deletions

View File

@ -153,6 +153,7 @@
<exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude> <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude> <exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
<exclude>src/test/resources/grok/single-line-log-messages.txt</exclude> <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
<exclude>src/test/resources/grok/grok_patterns.txt</exclude>
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude> <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude> <exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
<exclude>src/test/resources/json/bank-account-array.json</exclude> <exclude>src/test/resources/json/bank-account-array.json</exclude>

View File

@ -55,6 +55,7 @@ import java.io.Reader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -191,7 +192,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
} }
static RecordSchema createRecordSchema(final Grok grok) { static RecordSchema createRecordSchema(final Grok grok) {
final List<RecordField> fields = new ArrayList<>(); final Set<RecordField> fields = new LinkedHashSet<>();
String grokExpression = grok.getOriginalGrokPattern(); String grokExpression = grok.getOriginalGrokPattern();
populateSchemaFieldNames(grok, grokExpression, fields); populateSchemaFieldNames(grok, grokExpression, fields);
@ -199,11 +200,12 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true)); fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true)); fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
final RecordSchema schema = new SimpleRecordSchema(fields); final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fields));
return schema; return schema;
} }
private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) { private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final Collection<RecordField> fields) {
final Set<String> namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern()); final Set<String> namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern());
while (grokExpression.length() > 0) { while (grokExpression.length() > 0) {
final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);

View File

@ -24,8 +24,10 @@ import java.io.InputStreamReader;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
@ -142,12 +144,19 @@ public class GrokRecordReader implements RecordReader {
final Object normalizedValue; final Object normalizedValue;
if (rawValue instanceof List) { if (rawValue instanceof List) {
final List<?> list = (List<?>) rawValue; final List<?> list = (List<?>) rawValue;
final String[] array = new String[list.size()]; List<?> nonNullElements = list.stream().filter(Objects::nonNull).collect(Collectors.toList());
for (int i = 0; i < list.size(); i++) { if (nonNullElements.size() == 0) {
final Object rawObject = list.get(i); normalizedValue = null;
array[i] = rawObject == null ? null : rawObject.toString(); } else if (nonNullElements.size() == 1) {
normalizedValue = nonNullElements.get(0).toString();
} else {
final String[] array = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
final Object rawObject = list.get(i);
array[i] = rawObject == null ? null : rawObject.toString();
}
normalizedValue = array;
} }
normalizedValue = array;
} else { } else {
normalizedValue = rawValue == null ? null : rawValue.toString(); normalizedValue = rawValue == null ? null : rawValue.toString();
} }

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.grok;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.EqualsWrapper;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
public class TestGrokReader {
private TestRunner runner;
private List<Record> records;
private static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
.name("reader")
.identifiesControllerService(GrokReader.class)
.build();
@BeforeEach
void setUp() {
Processor processor = new AbstractProcessor() {
Relationship SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
session.transfer(flowFile, SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(READER);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(SUCCESS));
}
};
runner = TestRunners.newTestRunner(processor);
records = new ArrayList<>();
}
@Test
void testComplexGrokExpression() throws Exception {
// GIVEN
String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" + System.lineSeparator()
+ "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + System.lineSeparator();
String grokPatternFile = "src/test/resources/grok/grok_patterns.txt";
String grokExpression = "%{LINE}";
SimpleRecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
new RecordField("facility", RecordFieldType.STRING.getDataType()),
new RecordField("priority", RecordFieldType.STRING.getDataType()),
new RecordField("logsource", RecordFieldType.STRING.getDataType()),
new RecordField("program", RecordFieldType.STRING.getDataType()),
new RecordField("pid", RecordFieldType.STRING.getDataType()),
new RecordField("message", RecordFieldType.STRING.getDataType()),
new RecordField("stackTrace", RecordFieldType.STRING.getDataType()),
new RecordField("_raw", RecordFieldType.STRING.getDataType())
));
List<Record> expectedRecords = Arrays.asList(
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "1021-09-09 09:03:06");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage");
put("stackstrace", null);
put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
}}),
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "October 19 19:13:16");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage2");
put("stackstrace", null);
put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
}})
);
// WHEN
GrokReader grokReader = new GrokReader();
runner.addControllerService("grokReader", grokReader);
runner.setProperty(READER, "grokReader");
runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
runner.enableControllerService(grokReader);
runner.enqueue(input);
runner.run();
// THEN
List<Function<Record, Object>> propertyProviders = Arrays.asList(
Record::getSchema,
Record::getValues
);
List<EqualsWrapper<Record>> wrappedExpected = EqualsWrapper.wrapList(expectedRecords, propertyProviders);
List<EqualsWrapper<Record>> wrappedActual = EqualsWrapper.wrapList(records, propertyProviders);
Assertions.assertEquals(wrappedExpected, wrappedActual);
}
}

View File

@ -0,0 +1,4 @@
SYSLOGBASE_ISO8601 %{TIMESTAMP_ISO8601:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
LINE_1 %{SYSLOGBASE}%{GREEDYDATA:message}
LINE_2 %{SYSLOGBASE_ISO8601}%{GREEDYDATA:message}
LINE (?:%{LINE_1}|%{LINE_2})