NIFI-9850 Add support for multiple expressions to GrokReader (#5918)

* NIFI-9850 Added support for multiple expressions to GrokReader

- Updated Grok Expression property to support Resources

* NIFI-9850 Updated documentation for Fields from Grok Expression strategy

This closes #5918
Signed-off-by: Otto Fowler <otto@apache.org>
This commit is contained in:
exceptionfactory 2022-04-02 12:30:55 -05:00 committed by GitHub
parent f57facdbcf
commit e6e4109cf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 261 additions and 213 deletions

View File

@ -29,6 +29,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
@ -48,6 +49,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -60,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"})
@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data "
@ -71,8 +74,7 @@ import java.util.regex.Matcher;
+ "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). "
+ "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.")
public class GrokReader extends SchemaRegistryService implements RecordReaderFactory {
private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
private volatile List<Grok> groks;
private volatile NoMatchStrategy noMatchStrategy;
private volatile RecordSchema recordSchema;
private volatile RecordSchema recordSchemaFromGrok;
@ -87,8 +89,10 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
"The line of text that does not match the Grok Expression will only be added to the _raw field.");
static final AllowableValue STRING_FIELDS_FROM_GROK_EXPRESSION = new AllowableValue("string-fields-from-grok-expression", "Use String Fields From Grok Expression",
"The schema will be derived by using the field names present in the Grok Expression. All fields will be assumed to be of type String. Additionally, a field will be included "
+ "with a name of 'stackTrace' and a type of String.");
"The schema will be derived using the field names present in all configured Grok Expressions. "
+ "All schema fields will have a String type and will be marked as nullable. "
+ "The schema will also include a `stackTrace` field, and a `_raw` field containing the input line string."
);
static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
.name("Grok Pattern File")
@ -102,10 +106,14 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
.name("Grok Expression")
.displayName("Grok Expressions")
.description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. "
+ "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message."
+ "If other Grok expressions are referenced by this expression, they need to be supplied in the Grok Pattern File")
+ "The property supports one or more Grok expressions. The Reader attempts to parse input lines according to the configured order of the expressions."
+ "If a line in the log file does not match any expressions, the line will be assumed to belong to the previous log message."
+ "If other Grok patterns are referenced by this expression, they need to be supplied in the Grok Pattern File property."
)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT, ResourceType.URL, ResourceType.FILE)
.required(true)
.build();
@ -130,11 +138,10 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@OnEnabled
public void preCompile(final ConfigurationContext context) throws GrokException, IOException {
grokCompiler = GrokCompiler.newInstance();
GrokCompiler grokCompiler = GrokCompiler.newInstance();
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
try (final Reader defaultPatterns = getDefaultPatterns()) {
grokCompiler.register(defaultPatterns);
}
if (context.getProperty(PATTERN_FILE).isSet()) {
@ -144,10 +151,11 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
}
grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
groks = readGrokExpressions(context).stream()
.map(grokCompiler::compile)
.collect(Collectors.toList());
if(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue())) {
if (context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue())) {
noMatchStrategy = NoMatchStrategy.APPEND;
} else if (context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(RAW_LINE.getValue())) {
noMatchStrategy = NoMatchStrategy.RAW;
@ -155,7 +163,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
noMatchStrategy = NoMatchStrategy.SKIP;
}
this.recordSchemaFromGrok = createRecordSchema(grok);
this.recordSchemaFromGrok = createRecordSchema(groks);
final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
@ -167,42 +175,61 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
ArrayList<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
// validate the grok expression against configuration
GrokCompiler grokCompiler = GrokCompiler.newInstance();
String subject = GROK_EXPRESSION.getName();
String input = validationContext.getProperty(GROK_EXPRESSION).getValue();
GrokExpressionValidator validator;
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final GrokCompiler grokCompiler = GrokCompiler.newInstance();
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
} catch (IOException e) {
final String expressionSubject = GROK_EXPRESSION.getDisplayName();
try (final Reader defaultPatterns = getDefaultPatterns()) {
grokCompiler.register(defaultPatterns);
} catch (final IOException e) {
results.add(new ValidationResult.Builder()
.input(input)
.subject(subject)
.input("Default Grok Patterns")
.subject(expressionSubject)
.valid(false)
.explanation("Unable to load default patterns: " + e.getMessage())
.build());
}
validator = new GrokExpressionValidator(validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue(),grokCompiler);
results.add(validator.validate(subject,input,validationContext));
final String patternFileName = validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
final GrokExpressionValidator validator = new GrokExpressionValidator(patternFileName, grokCompiler);
try {
final List<String> grokExpressions = readGrokExpressions(validationContext);
final List<ValidationResult> grokExpressionResults = grokExpressions.stream()
.map(grokExpression -> validator.validate(expressionSubject, grokExpression, validationContext)).collect(Collectors.toList());
results.addAll(grokExpressionResults);
} catch (final IOException e) {
results.add(new ValidationResult.Builder()
.input("Configured Grok Expressions")
.subject(expressionSubject)
.valid(false)
.explanation(String.format("Read Grok Expressions failed: %s", e.getMessage()))
.build());
}
return results;
}
static RecordSchema createRecordSchema(final Grok grok) {
private List<String> readGrokExpressions(final PropertyContext propertyContext) throws IOException {
final ResourceReference expressionsResource = propertyContext.getProperty(GROK_EXPRESSION).asResource();
try (
final InputStream expressionsStream = expressionsResource.read();
final BufferedReader expressionsReader = new BufferedReader(new InputStreamReader(expressionsStream))
) {
return expressionsReader.lines().collect(Collectors.toList());
}
}
static RecordSchema createRecordSchema(final List<Grok> groks) {
final Set<RecordField> fields = new LinkedHashSet<>();
String grokExpression = grok.getOriginalGrokPattern();
populateSchemaFieldNames(grok, grokExpression, fields);
groks.forEach(grok -> populateSchemaFieldNames(grok, grok.getOriginalGrokPattern(), fields));
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fields));
return schema;
return new SimpleRecordSchema(new ArrayList<>(fields));
}
private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final Collection<RecordField> fields) {
@ -267,7 +294,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) {
return recordSchema;
}
@ -281,6 +308,14 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, noMatchStrategy);
return new GrokRecordReader(in, groks, schema, recordSchemaFromGrok, noMatchStrategy);
}
private Reader getDefaultPatterns() throws IOException {
final InputStream inputStream = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
if (inputStream == null) {
throw new IOException(String.format("Default Patterns [%s] not found", DEFAULT_PATTERN_NAME));
}
return new InputStreamReader(inputStream);
}
}

View File

@ -21,6 +21,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -40,14 +41,13 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.Match;
public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
private final Grok grok;
private final List<Grok> groks;
private final NoMatchStrategy noMatchStrategy;
private final RecordSchema schemaFromGrok;
private RecordSchema schema;
private final RecordSchema schema;
private String nextLine;
Map<String, Object> nextMap = null;
@ -56,15 +56,19 @@ public class GrokRecordReader implements RecordReader {
static final String RAW_MESSAGE_NAME = "_raw";
private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
"^\\s*(?:(?: |\\t)+at )|"
+ "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
+ "(?:Caused by\\: )|"
+ "(?:Suppressed\\: )|"
"^\\s*(?:(?:\\s{4}|\\t)+at )|"
+ "(?:(?:\\s{4}|\\t)+\\[CIRCULAR REFERENCE:)|"
+ "(?:Caused by: )|"
+ "(?:Suppressed: )|"
+ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy noMatchStrategy) {
this(in, Collections.singletonList(grok), schema, schemaFromGrok, noMatchStrategy);
}
public GrokRecordReader(final InputStream in, final List<Grok> groks, final RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy noMatchStrategy) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.grok = grok;
this.groks = groks;
this.schema = schema;
this.noMatchStrategy = noMatchStrategy;
this.schemaFromGrok = schemaFromGrok;
@ -91,10 +95,8 @@ public class GrokRecordReader implements RecordReader {
return null;
}
final Match match = grok.match(line);
valueMap = match.capture();
if((valueMap == null || valueMap.isEmpty()) && noMatchStrategy.equals(NoMatchStrategy.RAW)) {
valueMap = capture(line);
if ((valueMap == null || valueMap.isEmpty()) && noMatchStrategy.equals(NoMatchStrategy.RAW)) {
break;
}
}
@ -108,8 +110,7 @@ public class GrokRecordReader implements RecordReader {
String stackTrace = null;
final StringBuilder trailingText = new StringBuilder();
while ((nextLine = reader.readLine()) != null) {
final Match nextLineMatch = grok.match(nextLine);
final Map<String, Object> nextValueMap = nextLineMatch.capture();
final Map<String, Object> nextValueMap = capture(nextLine);
if (nextValueMap.isEmpty() && !noMatchStrategy.equals(NoMatchStrategy.RAW)) {
// next line did not match. Check if it indicates a Stack Trace. If so, read until
// the stack trace ends. Otherwise, append the next line to the last field in the record.
@ -128,14 +129,13 @@ public class GrokRecordReader implements RecordReader {
}
}
final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields);
return record;
return createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes);
}
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) {
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes) {
final Map<String, Object> converted = new HashMap<>();
if(valueMap != null && !valueMap.isEmpty()) {
if (valueMap != null && !valueMap.isEmpty()) {
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
@ -203,7 +203,7 @@ public class GrokRecordReader implements RecordReader {
if (value == null) {
converted.put(lastPopulatedFieldName, trailingText.toString());
} else if (value instanceof String) { // if not a String it is a List and we will just drop the trailing text
converted.put(lastPopulatedFieldName, (String) value + trailingText.toString());
converted.put(lastPopulatedFieldName, value + trailingText.toString());
}
}
}
@ -238,11 +238,7 @@ public class GrokRecordReader implements RecordReader {
return false;
}
if (line.indexOf(" ") < index) {
return false;
}
return true;
return line.indexOf(" ") >= index;
}
private String readStackTrace(final String firstLine) throws IOException {
@ -291,4 +287,16 @@ public class GrokRecordReader implements RecordReader {
return schema;
}
private Map<String, Object> capture(final String log) {
Map<String, Object> capture = Collections.emptyMap();
for (final Grok grok : groks) {
capture = grok.capture(log);
if (!capture.isEmpty()) {
break;
}
}
return capture;
}
}

View File

@ -16,90 +16,58 @@
*/
package org.apache.nifi.grok;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.EqualsWrapper;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.ArrayList;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestGrokReader {
private TestRunner runner;
private List<Record> records;
private static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
.name("reader")
.identifiesControllerService(GrokReader.class)
.build();
private static final String TIMESTAMP_FIELD = "timestamp";
private static final String LEVEL_FIELD = "level";
private static final String FACILITY_FIELD = "facility";
private static final String PROGRAM_FIELD = "program";
private static final String MESSAGE_FIELD = "message";
private static final String STACKTRACE_FIELD = "stackTrace";
private static final String RAW_FIELD = "_raw";
@BeforeEach
void setUp() {
Processor processor = new AbstractProcessor() {
Relationship SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
session.transfer(flowFile, SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(READER);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(SUCCESS));
}
};
runner = TestRunners.newTestRunner(processor);
records = new ArrayList<>();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
}
@Test
void testComplexGrokExpression() throws Exception {
// GIVEN
String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" + System.lineSeparator()
+ "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + System.lineSeparator();
@ -107,65 +75,100 @@ public class TestGrokReader {
String grokExpression = "%{LINE}";
SimpleRecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
new RecordField("facility", RecordFieldType.STRING.getDataType()),
new RecordField(TIMESTAMP_FIELD, RecordFieldType.STRING.getDataType()),
new RecordField(FACILITY_FIELD, RecordFieldType.STRING.getDataType()),
new RecordField("priority", RecordFieldType.STRING.getDataType()),
new RecordField("logsource", RecordFieldType.STRING.getDataType()),
new RecordField("program", RecordFieldType.STRING.getDataType()),
new RecordField(PROGRAM_FIELD, RecordFieldType.STRING.getDataType()),
new RecordField("pid", RecordFieldType.STRING.getDataType()),
new RecordField("message", RecordFieldType.STRING.getDataType()),
new RecordField("stackTrace", RecordFieldType.STRING.getDataType()),
new RecordField("_raw", RecordFieldType.STRING.getDataType())
new RecordField(MESSAGE_FIELD, RecordFieldType.STRING.getDataType()),
new RecordField(STACKTRACE_FIELD, RecordFieldType.STRING.getDataType()),
new RecordField(RAW_FIELD, RecordFieldType.STRING.getDataType())
));
List<Record> expectedRecords = Arrays.asList(
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "1021-09-09 09:03:06");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage");
put("stackstrace", null);
put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
}}),
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put("timestamp", "October 19 19:13:16");
put("facility", null);
put("priority", null);
put("logsource", "127.0.0.1");
put("program", "nifi");
put("pid", "1000");
put("message", " LogMessage2");
put("stackstrace", null);
put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
}})
);
final Record expectedFirstRecord = new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put(TIMESTAMP_FIELD, "1021-09-09 09:03:06");
put(FACILITY_FIELD, null);
put("priority", null);
put("logsource", "127.0.0.1");
put(PROGRAM_FIELD, "nifi");
put("pid", "1000");
put("message", " LogMessage");
put(STACKTRACE_FIELD, null);
put(RAW_FIELD, "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage");
}});
// WHEN
GrokReader grokReader = new GrokReader();
runner.addControllerService("grokReader", grokReader);
runner.setProperty(READER, "grokReader");
final Record expectedSecondRecord = new MapRecord(expectedSchema, new HashMap<String, Object>() {{
put(TIMESTAMP_FIELD, "October 19 19:13:16");
put(FACILITY_FIELD, null);
put("priority", null);
put("logsource", "127.0.0.1");
put(PROGRAM_FIELD, "nifi");
put("pid", "1000");
put(MESSAGE_FIELD, " LogMessage2");
put(STACKTRACE_FIELD, null);
put(RAW_FIELD, "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2");
}});
final GrokReader grokReader = new GrokReader();
runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
runner.enableControllerService(grokReader);
runner.enqueue(input);
runner.run();
final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(inputBytes);
final RecordReader recordReader = grokReader.createRecordReader(Collections.emptyMap(), inputStream, inputBytes.length, runner.getLogger());
// THEN
List<Function<Record, Object>> propertyProviders = Arrays.asList(
Record::getSchema,
Record::getValues
);
final Record firstRecord = recordReader.nextRecord();
List<EqualsWrapper<Record>> wrappedExpected = EqualsWrapper.wrapList(expectedRecords, propertyProviders);
List<EqualsWrapper<Record>> wrappedActual = EqualsWrapper.wrapList(records, propertyProviders);
assertArrayEquals(expectedFirstRecord.getValues(), firstRecord.getValues());
assertEquals(expectedSchema, firstRecord.getSchema());
Assertions.assertEquals(wrappedExpected, wrappedActual);
final Record secondRecord = recordReader.nextRecord();
assertArrayEquals(expectedSecondRecord.getValues(), secondRecord.getValues());
assertEquals(expectedSchema, secondRecord.getSchema());
assertNull(recordReader.nextRecord());
}
@Test
public void testMultipleExpressions() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
final String program = "NiFi";
final String level = "INFO";
final String message = "Processing Started";
final String timestamp = "Jan 10 12:30:45";
final String logs = String.format("%s %s %s%n%s %s %s %s%n", program, level, message, timestamp, program, level, message);
final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
final String matchingExpression = "%{PROG:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
final String firstExpression = "%{SYSLOGTIMESTAMP:timestamp} %{PROG:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
final String expressions = String.format("%s%n%s", firstExpression, matchingExpression);
final GrokReader grokReader = new GrokReader();
runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, expressions);
runner.setProperty(grokReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
runner.enableControllerService(grokReader);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
final RecordReader recordReader = grokReader.createRecordReader(Collections.emptyMap(), inputStream, bytes.length, runner.getLogger());
final Record firstRecord = recordReader.nextRecord();
assertNotNull(firstRecord);
assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
assertNull(firstRecord.getValue(TIMESTAMP_FIELD));
final Record secondRecord = recordReader.nextRecord();
assertNotNull(secondRecord);
assertEquals(program, secondRecord.getValue(PROGRAM_FIELD));
assertEquals(level, secondRecord.getValue(LEVEL_FIELD));
assertEquals(message, secondRecord.getValue(MESSAGE_FIELD));
assertEquals(timestamp, secondRecord.getValue(TIMESTAMP_FIELD));
assertNull(recordReader.nextRecord());
}
}

View File

@ -28,11 +28,11 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -45,7 +45,7 @@ public class TestGrokRecordReader {
@BeforeAll
public static void beforeClass() throws Exception {
try (final InputStream fis = new FileInputStream(new File("src/main/resources/default-grok-patterns.txt"))) {
try (final InputStream fis = new FileInputStream("src/main/resources/default-grok-patterns.txt")) {
grokCompiler = GrokCompiler.newInstance();
grokCompiler.register(fis);
}
@ -58,9 +58,8 @@ public class TestGrokRecordReader {
@Test
public void testParseSingleLineLogMessages() throws GrokException, IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/single-line-log-messages.txt"))) {
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), NoMatchStrategy.APPEND);
try (final InputStream fis = new FileInputStream("src/test/resources/grok/single-line-log-messages.txt")) {
final GrokRecordReader deserializer = getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
@ -85,15 +84,13 @@ public class TestGrokRecordReader {
}
}
@Test
public void testParseEmptyMessageWithStackTrace() throws GrokException, IOException, MalformedRecordException {
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces";
final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), NoMatchStrategy.APPEND);
final GrokRecordReader deserializer = getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", bais);
final Object[] values = deserializer.nextRecord().getValues();
@ -110,21 +107,18 @@ public class TestGrokRecordReader {
deserializer.close();
}
@Test
public void testParseNiFiSampleLog() throws IOException, GrokException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample.log"))) {
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), NoMatchStrategy.APPEND);
try (final InputStream fis = new FileInputStream("src/test/resources/grok/nifi-log-sample.log")) {
final GrokRecordReader deserializer = getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
for (String logLevel : logLevels) {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertEquals(logLevel, values[1]);
assertNull(values[5]);
assertNotNull(values[6]);
}
@ -136,18 +130,17 @@ public class TestGrokRecordReader {
@Test
public void testParseNiFiSampleMultilineWithStackTrace() throws IOException, GrokException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))) {
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), NoMatchStrategy.APPEND);
try (final InputStream fis = new FileInputStream("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log")) {
final GrokRecordReader deserializer = getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?", fis);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
for (String logLevel : logLevels) {
final Record record = deserializer.nextRecord();
final Object[] values = record.getValues();
assertNotNull(values);
assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertEquals(logLevel, values[1]);
if ("ERROR".equals(values[1])) {
final String msg = (String) values[4];
assertEquals("One\nTwo\nThree", msg);
@ -166,12 +159,10 @@ public class TestGrokRecordReader {
}
}
@Test
public void testParseStackTrace() throws GrokException, IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/error-with-stack-trace.log"))) {
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), NoMatchStrategy.APPEND);
try (final InputStream fis = new FileInputStream("src/test/resources/grok/error-with-stack-trace.log")) {
final GrokRecordReader deserializer = getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
final String[] messages = new String[] {"message without stack trace",
@ -223,7 +214,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
final Grok grok = grokCompiler.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(9, fieldNames.size());
assertTrue(fieldNames.contains("timestamp"));
@ -264,7 +255,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@ -298,7 +289,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@ -311,11 +302,11 @@ public class TestGrokRecordReader {
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, NoMatchStrategy.RAW);
Record record = deserializer.nextRecord();
assertEquals(null, record.getValue("first"));
assertEquals(null, record.getValue("second"));
assertEquals(null, record.getValue("third"));
assertEquals(null, record.getValue("fourth"));
assertEquals(null, record.getValue("fifth"));
assertNull(record.getValue("first"));
assertNull(record.getValue("second"));
assertNull(record.getValue("third"));
assertNull(record.getValue("fourth"));
assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
record = deserializer.nextRecord();
@ -343,7 +334,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@ -381,7 +372,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@ -403,11 +394,11 @@ public class TestGrokRecordReader {
record = deserializer.nextRecord();
assertEquals(null, record.getValue("first"));
assertEquals(null, record.getValue("second"));
assertEquals(null, record.getValue("third"));
assertEquals(null, record.getValue("fourth"));
assertEquals(null, record.getValue("fifth"));
assertNull(record.getValue("first"));
assertNull(record.getValue("second"));
assertNull(record.getValue("third"));
assertNull(record.getValue("fourth"));
assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
record = deserializer.nextRecord();
@ -425,7 +416,7 @@ public class TestGrokRecordReader {
}
@Test
public void testRawUnmatchedRecordlast() throws GrokException, IOException, MalformedRecordException {
public void testRawUnmatchedRecordLast() throws GrokException, IOException, MalformedRecordException {
final String nonMatchingRecord = "hello there";
final String matchingRecord = "1 2 3 4 5";
@ -435,7 +426,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@ -457,15 +448,26 @@ public class TestGrokRecordReader {
record = deserializer.nextRecord();
assertEquals(null, record.getValue("first"));
assertEquals(null, record.getValue("second"));
assertEquals(null, record.getValue("third"));
assertEquals(null, record.getValue("fourth"));
assertEquals(null, record.getValue("fifth"));
assertNull(record.getValue("first"));
assertNull(record.getValue("second"));
assertNull(record.getValue("third"));
assertNull(record.getValue("fourth"));
assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
private RecordSchema getRecordSchema(final Grok grok) {
final List<Grok> groks = Collections.singletonList(grok);
return GrokReader.createRecordSchema(groks);
}
private GrokRecordReader getRecordReader(final String pattern, final InputStream inputStream) {
final Grok grok = grokCompiler.compile(pattern);
final RecordSchema recordSchema = getRecordSchema(grok);
return new GrokRecordReader(inputStream, grok, recordSchema, recordSchema, NoMatchStrategy.APPEND);
}
}