diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2ee02dc2a0..d3fa7f8743 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -306,8 +306,8 @@ bval-jsr - io.thekraken - grok + io.krakens + java-grok org.apache.calcite diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index a7e421bd9f..c5675ea683 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -18,9 +18,10 @@ package org.apache.nifi.processors.standard; import com.fasterxml.jackson.databind.ObjectMapper; -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.Match; -import io.thekraken.grok.api.exception.GrokException; +import io.krakens.grok.api.Grok; +import io.krakens.grok.api.GrokCompiler; +import io.krakens.grok.api.Match; +import io.krakens.grok.api.exception.GrokException; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -33,15 +34,13 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.FlowFile; - +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; @@ -49,17 +48,22 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.Reader; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.Set; -import java.util.HashSet; -import java.util.ArrayList; -import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -80,18 +84,21 @@ public class ExtractGrok extends AbstractProcessor { public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute"; public static final String FLOWFILE_CONTENT = "flowfile-content"; private static final String APPLICATION_JSON = "application/json"; + private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() .name("Grok Expression") - .description("Grok expression") + .description("Grok expression. If other Grok expressions are referenced in this expression, they must be provided " + + "in the Grok Pattern File if set or exist in the default Grok patterns") .required(true) - .addValidator(validateGrokExpression()) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder() .name("Grok Pattern file") - .description("Grok Pattern file definition") - .required(true) + .description("Grok Pattern file definition. This file will be loaded after the default Grok " + + "patterns file. If not set, then only the Grok Expression and the default Grok patterns will be used.") + .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .build(); @@ -145,7 +152,8 @@ public class ExtractGrok extends AbstractProcessor { private final static List descriptors; private final static Set relationships; - private volatile Grok grok = new Grok(); + private volatile GrokCompiler grokCompiler; + private volatile Grok grok; private final BlockingQueue bufferQueue = new LinkedBlockingQueue<>(); static { @@ -179,17 +187,73 @@ public class ExtractGrok extends AbstractProcessor { bufferQueue.clear(); } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + Collection problems = new ArrayList<>(); + + // validate the grok expression against configuration + // if there is a GROK_PATTERN_FILE set we must be sure to register that so that it's + // patterns will be available to compile() + // we also have to make sure the default grok patterns are loaded + boolean namedCaptures = false; + if (validationContext.getProperty(NAMED_CAPTURES_ONLY).isSet()) { + namedCaptures = validationContext.getProperty(NAMED_CAPTURES_ONLY).asBoolean(); + } + + GrokCompiler grokCompiler = GrokCompiler.newInstance(); + String subject = GROK_EXPRESSION.getName(); + String input = validationContext.getProperty(GROK_EXPRESSION).getValue(); + + try { + + try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(in); + } + + if (validationContext.getProperty(GROK_PATTERN_FILE).isSet()) { + try (final InputStream in = new FileInputStream(new File(validationContext.getProperty(GROK_PATTERN_FILE).getValue())); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(reader); + } + } + grok = grokCompiler.compile(input, namedCaptures); + } catch (final Exception e) { + problems.add(new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid Grok Expression - " + e.getMessage()) + .build()); + return problems; + } + + problems.add(new ValidationResult.Builder().subject(subject).input(input).valid(true).build()); + return problems; + } + @OnScheduled - public void onScheduled(final ProcessContext context) throws GrokException { + public void onScheduled(final ProcessContext context) throws GrokException, IOException { for (int i = 0; i < context.getMaxConcurrentTasks(); i++) { final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final byte[] buffer = new byte[maxBufferSize]; bufferQueue.add(buffer); } - grok = new Grok(); - grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue()); - grok.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); + grokCompiler = GrokCompiler.newInstance(); + + try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(in); + } + + if (context.getProperty(GROK_PATTERN_FILE).isSet()) { + try (final InputStream in = new FileInputStream(new File(context.getProperty(GROK_PATTERN_FILE).getValue())); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(reader); + } + } + grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); } @Override @@ -222,9 +286,9 @@ public class ExtractGrok extends AbstractProcessor { } final Match gm = grok.match(contentString); - gm.captures(); + final Map captureMap = gm.capture(); - if (gm.toMap().isEmpty()) { + if (captureMap.isEmpty()) { session.transfer(flowFile, REL_NO_MATCH); getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); return; @@ -234,7 +298,7 @@ public class ExtractGrok extends AbstractProcessor { switch (context.getProperty(DESTINATION).getValue()) { case FLOWFILE_ATTRIBUTE: Map grokResults = new HashMap<>(); - for (Map.Entry entry : gm.toMap().entrySet()) { + for (Map.Entry entry : captureMap.entrySet()) { if (null != entry.getValue()) { grokResults.put("grok." + entry.getKey(), entry.getValue().toString()); } @@ -250,7 +314,7 @@ public class ExtractGrok extends AbstractProcessor { FlowFile conFlowfile = session.write(flowFile, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { - out.write(objectMapper.writeValueAsBytes(gm.toMap())); + out.write(objectMapper.writeValueAsBytes(captureMap)); } }); conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); @@ -260,28 +324,4 @@ public class ExtractGrok extends AbstractProcessor { break; } } - - - public static final Validator validateGrokExpression() { - return new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - - Grok grok = new Grok(); - try { - grok.compile(input); - } catch (GrokException | java.util.regex.PatternSyntaxException e) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Not a valid Grok Expression - " + e.getMessage()) - .build(); - } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - }; - } - } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/default-grok-patterns.txt b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/default-grok-patterns.txt new file mode 100644 index 0000000000..4b110e87f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/default-grok-patterns.txt @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +# Log Levels +LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)|FINE|FINER|FINEST|CONFIG + +# Syslog Dates: Month Day HH:MM:SS +SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} +PROG (?:[\w._/%-]+) +SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? +SYSLOGHOST %{IPORHOST} +SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> +HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} + +# Months: January, Feb, 3, 03, 12, December +MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b +MONTHNUM (?:0?[1-9]|1[0-2]) +MONTHNUM2 (?:0[1-9]|1[0-2]) +MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) + +# Days: Monday, Tue, Thu, etc... +DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) + +# Years? +YEAR (?>\d\d){1,2} +HOUR (?:2[0123]|[01]?[0-9]) +MINUTE (?:[0-5][0-9]) +# '60' is a leap second in most time standards and thus is valid. +SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) +TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) + +# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) +DATE_US_MONTH_DAY_YEAR %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} +DATE_US_YEAR_MONTH_DAY %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY} +DATE_US %{DATE_US_MONTH_DAY_YEAR}|%{DATE_US_YEAR_MONTH_DAY} +DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} +ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) +ISO8601_SECOND (?:%{SECOND}|60) +TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? +DATE %{DATE_US}|%{DATE_EU} +DATESTAMP %{DATE}[- ]%{TIME} +TZ (?:[PMCE][SD]T|UTC) +DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} +DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} +DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} +DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} + + +POSINT \b(?:[1-9][0-9]*)\b +NONNEGINT \b(?:[0-9]+)\b +WORD \b\w+\b +NOTSPACE \S+ +SPACE \s* +DATA .*? +GREEDYDATA .* +QUOTEDSTRING (?>(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) +UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} + +USERNAME [a-zA-Z0-9._-]+ +USER %{USERNAME} +INT (?:[+-]?(?:[0-9]+)) +BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) +NUMBER (?:%{BASE10NUM}) +BASE16NUM (?/(?>[\w_%!$@:.,-]+|\\.)*)+ +TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) +WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ +URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? +URIHOST %{IPORHOST}(?::%{POSINT:port})? +# uripath comes loosely from RFC1738, but mostly from what Firefox +# doesn't turn into %XX +URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ +#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? +URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]* +URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? +URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? + +# Shortcuts +QS %{QUOTEDSTRING} + +# Log formats +SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: +COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) +COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index b5891ad7dd..ed70e5f614 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -39,6 +39,13 @@ public class TestExtractGrok { testRunner = TestRunners.newTestRunner(ExtractGrok.class); } + @Test + public void testExtractGrokWithMissingPattern() throws Exception { + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{FOOLOG}"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.assertNotValid(); + } + @Test public void testExtractGrokWithMatchedContent() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); @@ -60,7 +67,7 @@ public class TestExtractGrok { @Test public void testExtractGrokWithUnMatchedContent() throws IOException { - testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}"); + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{URI}"); testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); testRunner.enqueue(GROK_TEXT_INPUT); testRunner.run(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml index 01d888fc00..7c8e3ea396 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml @@ -359,9 +359,9 @@ - io.thekraken - grok - 0.1.5 + io.krakens + java-grok + 0.1.9 com.google.code diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index c57e07cde7..0db421706d 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -77,9 +77,9 @@ 2.6 - io.thekraken - grok - 0.1.5 + io.krakens + java-grok + 0.1.9 org.apache.avro diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java index dd9c4e05d4..7894b33835 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java @@ -17,18 +17,45 @@ package org.apache.nifi.grok; +import io.krakens.grok.api.GrokCompiler; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; -import io.thekraken.grok.api.Grok; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; public class GrokExpressionValidator implements Validator { + private GrokCompiler grokCompiler; + private String patternFileName; + + public GrokExpressionValidator(String patternFileName, GrokCompiler compiler) { + this.patternFileName = patternFileName; + this.grokCompiler = compiler; + } + + public GrokExpressionValidator() { + this.grokCompiler = GrokCompiler.newInstance(); + } @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { try { - new Grok().compile(input); + try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(in); + } + + if (patternFileName != null) { + try (final InputStream in = new FileInputStream(new File(patternFileName)); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(reader); + } + } + grokCompiler.compile(input); } catch (final Exception e) { return new ValidationResult.Builder() .input(input) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 29e963679f..9e7293bc5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -17,23 +17,17 @@ package org.apache.nifi.grok; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; - +import io.krakens.grok.api.Grok; +import io.krakens.grok.api.GrokCompiler; +import io.krakens.grok.api.GrokUtils; +import io.krakens.grok.api.exception.GrokException; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; @@ -51,9 +45,18 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.GrokUtils; -import io.thekraken.grok.api.exception.GrokException; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; @Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"}) @CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data " @@ -65,12 +68,13 @@ import io.thekraken.grok.api.exception.GrokException; + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). " + "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.") public class GrokReader extends SchemaRegistryService implements RecordReaderFactory { + private volatile GrokCompiler grokCompiler; private volatile Grok grok; private volatile boolean appendUnmatchedLine; private volatile RecordSchema recordSchema; private volatile RecordSchema recordSchemaFromGrok; - private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; + static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; static final AllowableValue APPEND_TO_PREVIOUS_MESSAGE = new AllowableValue("append-to-previous-message", "Append to Previous Message", "The line of text that does not match the Grok Expression will be appended to the last field of the prior message."); @@ -94,8 +98,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() .name("Grok Expression") .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. " - + "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message.") - .addValidator(new GrokExpressionValidator()) + + "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message." + + "If other Grok expressions are referenced by this expression, they need to be supplied in the Grok Pattern File") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .required(true) .build(); @@ -120,18 +125,21 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac @OnEnabled public void preCompile(final ConfigurationContext context) throws GrokException, IOException { - grok = new Grok(); + grokCompiler = GrokCompiler.newInstance(); try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME); final Reader reader = new InputStreamReader(in)) { - grok.addPatternFromReader(reader); + grokCompiler.register(reader); } if (context.getProperty(PATTERN_FILE).isSet()) { - grok.addPatternFromFile(context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue()); + try (final InputStream in = new FileInputStream(context.getProperty(PATTERN_FILE) + .evaluateAttributeExpressions().getValue()); final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(reader); + } } - grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); + grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue()); appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()); @@ -145,6 +153,32 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + ArrayList results = new ArrayList<>(super.customValidate(validationContext)); + // validate the grok expression against configuration + GrokCompiler grokCompiler = GrokCompiler.newInstance(); + String subject = GROK_EXPRESSION.getName(); + String input = validationContext.getProperty(GROK_EXPRESSION).getValue(); + GrokExpressionValidator validator; + + try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME); + final Reader reader = new InputStreamReader(in)) { + grokCompiler.register(reader); + } catch (IOException e) { + results.add(new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Unable to load default patterns: " + e.getMessage()) + .build()); + } + + validator = new GrokExpressionValidator(validationContext.getProperty(PATTERN_FILE).getValue(),grokCompiler); + results.add(validator.validate(subject,input,validationContext)); + return results; + } + static RecordSchema createRecordSchema(final Grok grok) { final List fields = new ArrayList<>(); @@ -159,10 +193,11 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List fields) { + final Set namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern()); while (grokExpression.length() > 0) { final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); if (matcher.find()) { - final Map extractedGroups = GrokUtils.namedGroups(matcher, grokExpression); + final Map extractedGroups = GrokUtils.namedGroups(matcher, namedGroups); final String subName = extractedGroups.get("subname"); if (subName == null) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java index b7c397116a..0c82fb4e05 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java @@ -37,8 +37,8 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.Match; +import io.krakens.grok.api.Grok; +import io.krakens.grok.api.Match; public class GrokRecordReader implements RecordReader { private final BufferedReader reader; @@ -90,8 +90,7 @@ public class GrokRecordReader implements RecordReader { } final Match match = grok.match(line); - match.captures(); - valueMap = match.toMap(); + valueMap = match.capture(); } if (iterations == 0 && nextLine != null) { @@ -104,8 +103,7 @@ public class GrokRecordReader implements RecordReader { final StringBuilder trailingText = new StringBuilder(); while ((nextLine = reader.readLine()) != null) { final Match nextLineMatch = grok.match(nextLine); - nextLineMatch.captures(); - final Map nextValueMap = nextLineMatch.toMap(); + final Map nextValueMap = nextLineMatch.capture(); if (nextValueMap.isEmpty()) { // next line did not match. Check if it indicates a Stack Trace. If so, read until // the stack trace ends. Otherwise, append the next line to the last field in the record. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index 83286dc96a..538faa8963 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -17,37 +17,49 @@ package org.apache.nifi.grok; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import io.krakens.grok.api.Grok; +import io.krakens.grok.api.GrokCompiler; +import io.krakens.grok.api.exception.GrokException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.List; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.Test; - -import io.thekraken.grok.api.Grok; -import io.thekraken.grok.api.exception.GrokException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestGrokRecordReader { + private static GrokCompiler grokCompiler; + + @BeforeClass + public static void beforeClass() throws Exception { + try (final InputStream fis = new FileInputStream(new File("src/main/resources/default-grok-patterns.txt"))) { + grokCompiler = GrokCompiler.newInstance(); + grokCompiler.register(fis); + } + } + + @AfterClass + public static void afterClass() throws Exception { + grokCompiler = null; + } @Test public void testParseSingleLineLogMessages() throws GrokException, IOException, MalformedRecordException { try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/single-line-log-messages.txt"))) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - + final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"}; @@ -76,9 +88,7 @@ public class TestGrokRecordReader { @Test public void testParseEmptyMessageWithStackTrace() throws GrokException, IOException, MalformedRecordException { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); + final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n" + "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"; @@ -105,12 +115,8 @@ public class TestGrokRecordReader { @Test public void testParseNiFiSampleLog() throws IOException, GrokException, MalformedRecordException { try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample.log"))) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); - + final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); - final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"}; for (int i = 0; i < logLevels.length; i++) { @@ -131,12 +137,8 @@ public class TestGrokRecordReader { @Test public void testParseNiFiSampleMultilineWithStackTrace() throws IOException, GrokException, MalformedRecordException { try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?"); - + final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?"); final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); - final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"}; for (int i = 0; i < logLevels.length; i++) { @@ -168,10 +170,7 @@ public class TestGrokRecordReader { @Test public void testParseStackTrace() throws GrokException, IOException, MalformedRecordException { try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/error-with-stack-trace.log"))) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - + final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"}; @@ -217,14 +216,12 @@ public class TestGrokRecordReader { } @Test - public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException { + public void testInheritNamedParameters() throws IOException, GrokException, MalformedRecordException { final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message"; final byte[] msgBytes = syslogMsg.getBytes(); try (final InputStream in = new ByteArrayInputStream(msgBytes)) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}"); + final Grok grok = grokCompiler.compile("%{SYSLOGBASE}%{GREEDYDATA:message}"); final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames(); @@ -265,9 +262,7 @@ public class TestGrokRecordReader { final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); try (final InputStream in = new ByteArrayInputStream(inputBytes)) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}"); + final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}"); final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames(); @@ -301,9 +296,7 @@ public class TestGrokRecordReader { final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); try (final InputStream in = new ByteArrayInputStream(inputBytes)) { - final Grok grok = new Grok(); - grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); - grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}"); + final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}"); final RecordSchema schema = GrokReader.createRecordSchema(grok); final List fieldNames = schema.getFieldNames();