NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6260.
This commit is contained in:
exceptionfactory 2022-07-29 16:43:36 -05:00 committed by Pierre Villard
parent 0fd262efca
commit fdf3925f81
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 91 additions and 79 deletions

View File

@ -44,19 +44,12 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@ -97,12 +90,13 @@ public class ExtractGrok extends AbstractProcessor {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
.name("Grok Pattern file")
.description("Grok Pattern file definition. This file will be loaded after the default Grok "
+ "patterns file. If not set, then only the Grok Expression and the default Grok patterns will be used.")
.displayName("Grok Patterns")
.description("Custom Grok pattern definitions. These definitions will be loaded after the default Grok "
+ "patterns. The Grok Parser will use the default Grok patterns when this property is not configured.")
.required(false)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT, ResourceType.URL)
.build();
public static final PropertyDescriptor KEEP_EMPTY_CAPTURES = new PropertyDescriptor.Builder()
@ -164,7 +158,6 @@ public class ExtractGrok extends AbstractProcessor {
private final static List<PropertyDescriptor> descriptors;
private final static Set<Relationship> relationships;
private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
@ -178,7 +171,7 @@ public class ExtractGrok extends AbstractProcessor {
final List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.add(GROK_EXPRESSION);
_descriptors.add(GROK_PATTERN_FILE);
_descriptors.add(GROK_PATTERNS);
_descriptors.add(DESTINATION);
_descriptors.add(CHARACTER_SET);
_descriptors.add(MAX_BUFFER_SIZE);
@ -221,15 +214,13 @@ public class ExtractGrok extends AbstractProcessor {
try {
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(in);
try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
grokCompiler.register(defaultPatterns);
}
if (validationContext.getProperty(GROK_PATTERN_FILE).isSet()) {
try (final InputStream in = validationContext.getProperty(GROK_PATTERN_FILE).asResource().read();
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
if (validationContext.getProperty(GROK_PATTERNS).isSet()) {
try (final InputStream patterns = validationContext.getProperty(GROK_PATTERNS).asResource().read()) {
grokCompiler.register(patterns);
}
}
grok = grokCompiler.compile(input, namedCaptures);
@ -258,17 +249,15 @@ public class ExtractGrok extends AbstractProcessor {
bufferQueue.add(buffer);
}
grokCompiler = GrokCompiler.newInstance();
final GrokCompiler grokCompiler = GrokCompiler.newInstance();
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(in);
try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
grokCompiler.register(defaultPatterns);
}
if (context.getProperty(GROK_PATTERN_FILE).isSet()) {
try (final InputStream in = new FileInputStream(new File(context.getProperty(GROK_PATTERN_FILE).getValue()));
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
if (context.getProperty(GROK_PATTERNS).isSet()) {
try (final InputStream patterns = context.getProperty(GROK_PATTERNS).asResource().read()) {
grokCompiler.register(patterns);
}
}
grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean());
@ -292,12 +281,7 @@ public class ExtractGrok extends AbstractProcessor {
try {
final byte[] byteBuffer = buffer;
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, byteBuffer, false);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false));
final long len = Math.min(byteBuffer.length, flowFile.getSize());
contentString = new String(byteBuffer, 0, (int) len, charset);
} finally {
@ -310,7 +294,7 @@ public class ExtractGrok extends AbstractProcessor {
if (captureMap.isEmpty()) {
session.transfer(flowFile, REL_NO_MATCH);
getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
getLogger().info("Did not match any Grok Expressions for FlowFile {}", flowFile);
return;
}
@ -327,16 +311,11 @@ public class ExtractGrok extends AbstractProcessor {
flowFile = session.putAllAttributes(flowFile, grokResults);
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);
getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", grokResults.size(), flowFile);
break;
case FLOWFILE_CONTENT:
FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write(objectMapper.writeValueAsBytes(captureMap));
}
});
FlowFile conFlowfile = session.write(flowFile, outputStream -> objectMapper.writeValue(outputStream, captureMap));
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(conFlowfile, REL_MATCH);

View File

@ -46,10 +46,22 @@ public class TestExtractGrok {
testRunner.assertNotValid();
}
@Test
public void testExtractGrokPatternsProperty() {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{USERNAME:username} - %{DATA}");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "USERNAME [a-zA-Z0-9._-]+");
testRunner.enqueue("admin - 127.0.0.1");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0);
matched.assertAttributeEquals("grok.username","admin");
}
@Test
public void testExtractGrokWithMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
@ -66,7 +78,7 @@ public class TestExtractGrok {
}
@Test
public void testExtractGrokKeepEmptyCaptures() throws Exception {
public void testExtractGrokKeepEmptyCaptures() {
String expression = "%{NUMBER}|%{NUMBER}";
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
testRunner.enqueue("-42");
@ -77,7 +89,7 @@ public class TestExtractGrok {
}
@Test
public void testExtractGrokDoNotKeepEmptyCaptures() throws Exception {
public void testExtractGrokDoNotKeepEmptyCaptures() {
String expression = "%{NUMBER}|%{NUMBER}";
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
testRunner.setProperty(ExtractGrok.KEEP_EMPTY_CAPTURES,"false");
@ -92,7 +104,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithUnMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{URI}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_TEXT_INPUT);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH);
@ -103,7 +115,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithNotFoundPatternFile() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/toto_file");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "file:///src/test/resources/TestExtractGrok/file-not-found");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@ -111,7 +123,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithBadGrokExpression() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@ -119,7 +131,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithNamedCapturesOnly() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.run();

View File

@ -21,38 +21,29 @@ import io.krakens.grok.api.GrokCompiler;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceReference;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
public class GrokExpressionValidator implements Validator {
private GrokCompiler grokCompiler;
private String patternFileName;
private final GrokCompiler grokCompiler;
private final ResourceReference patternsReference;
public GrokExpressionValidator(String patternFileName, GrokCompiler compiler) {
this.patternFileName = patternFileName;
public GrokExpressionValidator(ResourceReference patternsReference, GrokCompiler compiler) {
this.patternsReference = patternsReference;
this.grokCompiler = compiler;
}
public GrokExpressionValidator() {
this.grokCompiler = GrokCompiler.newInstance();
}
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
try {
try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME)) {
grokCompiler.register(in);
}
if (patternFileName != null) {
try (final InputStream in = new FileInputStream(new File(patternFileName));
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
if (patternsReference != null) {
try (final InputStream patterns = patternsReference.read()) {
grokCompiler.register(patterns);
}
}
grokCompiler.compile(input);

View File

@ -94,12 +94,13 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
+ "The schema will also include a `stackTrace` field, and a `_raw` field containing the input line string."
);
static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
.name("Grok Pattern File")
.description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+ "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's "
.displayName("Grok Patterns")
.description("Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+ "will be used. If specified, all patterns specified will override the default patterns. See the Controller Service's "
+ "Additional Details for a list of pre-defined patterns.")
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
@ -130,7 +131,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PATTERN_FILE);
properties.add(GROK_PATTERNS);
properties.add(GROK_EXPRESSION);
properties.add(NO_MATCH_BEHAVIOR);
return properties;
@ -144,10 +145,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
grokCompiler.register(defaultPatterns);
}
if (context.getProperty(PATTERN_FILE).isSet()) {
try (final InputStream in = context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().asResource().read();
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
if (context.getProperty(GROK_PATTERNS).isSet()) {
try (final InputStream patterns = context.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource().read()) {
grokCompiler.register(patterns);
}
}
@ -191,8 +191,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
.build());
}
final String patternFileName = validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
final GrokExpressionValidator validator = new GrokExpressionValidator(patternFileName, grokCompiler);
final ResourceReference patternsReference = validationContext.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource();
final GrokExpressionValidator validator = new GrokExpressionValidator(patternsReference, grokCompiler);
try {
final List<String> grokExpressions = readGrokExpressions(validationContext);

View File

@ -112,7 +112,7 @@ public class TestGrokReader {
final GrokReader grokReader = new GrokReader();
runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
runner.enableControllerService(grokReader);
@ -171,4 +171,34 @@ public class TestGrokReader {
assertNull(recordReader.nextRecord());
}
@Test
public void testPatternsProperty() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
final String program = "NiFi";
final String level = "INFO";
final String message = "Processing Started";
final String logs = String.format("%s %s %s%n", program, level, message);
final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
final String matchingExpression = "%{PROGRAM:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
final GrokReader grokReader = new GrokReader();
runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, "PROGRAM [a-zA-Z]+");
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, matchingExpression);
runner.setProperty(grokReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
runner.enableControllerService(grokReader);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
final RecordReader recordReader = grokReader.createRecordReader(Collections.emptyMap(), inputStream, bytes.length, runner.getLogger());
final Record firstRecord = recordReader.nextRecord();
assertNotNull(firstRecord);
assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
assertNull(recordReader.nextRecord());
}
}