NIFI-5170 Upgrad Grok to version 0.1.9

This closes #2691

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Otto Fowler 2018-05-08 15:53:20 -04:00 committed by Mike Thomsen
parent 64356e0014
commit 61fe493786
10 changed files with 345 additions and 130 deletions

View File

@ -306,8 +306,8 @@
<artifactId>bval-jsr</artifactId>
</dependency>
<dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
<groupId>io.krakens</groupId>
<artifactId>java-grok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>

View File

@ -18,9 +18,10 @@
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.Match;
import io.thekraken.grok.api.exception.GrokException;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import io.krakens.grok.api.exception.GrokException;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -33,15 +34,13 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
@ -49,17 +48,22 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -80,18 +84,21 @@ public class ExtractGrok extends AbstractProcessor {
public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute";
public static final String FLOWFILE_CONTENT = "flowfile-content";
private static final String APPLICATION_JSON = "application/json";
private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
.name("Grok Expression")
.description("Grok expression")
.description("Grok expression. If other Grok expressions are referenced in this expression, they must be provided "
+ "in the Grok Pattern File if set or exist in the default Grok patterns")
.required(true)
.addValidator(validateGrokExpression())
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder()
.name("Grok Pattern file")
.description("Grok Pattern file definition")
.required(true)
.description("Grok Pattern file definition. This file will be loaded after the default Grok "
+ "patterns file. If not set, then only the Grok Expression and the default Grok patterns will be used.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
@ -145,7 +152,8 @@ public class ExtractGrok extends AbstractProcessor {
private final static List<PropertyDescriptor> descriptors;
private final static Set<Relationship> relationships;
private volatile Grok grok = new Grok();
private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
static {
@ -179,17 +187,73 @@ public class ExtractGrok extends AbstractProcessor {
bufferQueue.clear();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
Collection<ValidationResult> problems = new ArrayList<>();
// validate the grok expression against configuration
// if there is a GROK_PATTERN_FILE set we must be sure to register that so that it's
// patterns will be available to compile()
// we also have to make sure the default grok patterns are loaded
boolean namedCaptures = false;
if (validationContext.getProperty(NAMED_CAPTURES_ONLY).isSet()) {
namedCaptures = validationContext.getProperty(NAMED_CAPTURES_ONLY).asBoolean();
}
GrokCompiler grokCompiler = GrokCompiler.newInstance();
String subject = GROK_EXPRESSION.getName();
String input = validationContext.getProperty(GROK_EXPRESSION).getValue();
try {
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(in);
}
if (validationContext.getProperty(GROK_PATTERN_FILE).isSet()) {
try (final InputStream in = new FileInputStream(new File(validationContext.getProperty(GROK_PATTERN_FILE).getValue()));
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
}
}
grok = grokCompiler.compile(input, namedCaptures);
} catch (final Exception e) {
problems.add(new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid Grok Expression - " + e.getMessage())
.build());
return problems;
}
problems.add(new ValidationResult.Builder().subject(subject).input(input).valid(true).build());
return problems;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws GrokException {
public void onScheduled(final ProcessContext context) throws GrokException, IOException {
for (int i = 0; i < context.getMaxConcurrentTasks(); i++) {
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final byte[] buffer = new byte[maxBufferSize];
bufferQueue.add(buffer);
}
grok = new Grok();
grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue());
grok.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean());
grokCompiler = GrokCompiler.newInstance();
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(in);
}
if (context.getProperty(GROK_PATTERN_FILE).isSet()) {
try (final InputStream in = new FileInputStream(new File(context.getProperty(GROK_PATTERN_FILE).getValue()));
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
}
}
grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean());
}
@Override
@ -222,9 +286,9 @@ public class ExtractGrok extends AbstractProcessor {
}
final Match gm = grok.match(contentString);
gm.captures();
final Map<String,Object> captureMap = gm.capture();
if (gm.toMap().isEmpty()) {
if (captureMap.isEmpty()) {
session.transfer(flowFile, REL_NO_MATCH);
getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
return;
@ -234,7 +298,7 @@ public class ExtractGrok extends AbstractProcessor {
switch (context.getProperty(DESTINATION).getValue()) {
case FLOWFILE_ATTRIBUTE:
Map<String, String> grokResults = new HashMap<>();
for (Map.Entry<String, Object> entry : gm.toMap().entrySet()) {
for (Map.Entry<String, Object> entry : captureMap.entrySet()) {
if (null != entry.getValue()) {
grokResults.put("grok." + entry.getKey(), entry.getValue().toString());
}
@ -250,7 +314,7 @@ public class ExtractGrok extends AbstractProcessor {
FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write(objectMapper.writeValueAsBytes(gm.toMap()));
out.write(objectMapper.writeValueAsBytes(captureMap));
}
});
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
@ -260,28 +324,4 @@ public class ExtractGrok extends AbstractProcessor {
break;
}
}
public static final Validator validateGrokExpression() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
Grok grok = new Grok();
try {
grok.compile(input);
} catch (GrokException | java.util.regex.PatternSyntaxException e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid Grok Expression - " + e.getMessage())
.build();
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
}

View File

@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)|FINE|FINER|FINEST|CONFIG
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US_MONTH_DAY_YEAR %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_US_YEAR_MONTH_DAY %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
DATE_US %{DATE_US_MONTH_DAY_YEAR}|%{DATE_US_YEAR_MONTH_DAY}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

View File

@ -39,6 +39,13 @@ public class TestExtractGrok {
testRunner = TestRunners.newTestRunner(ExtractGrok.class);
}
@Test
public void testExtractGrokWithMissingPattern() throws Exception {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{FOOLOG}");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@Test
public void testExtractGrokWithMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
@ -60,7 +67,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithUnMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}");
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{URI}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_TEXT_INPUT);
testRunner.run();

View File

@ -359,9 +359,9 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
<version>0.1.5</version>
<groupId>io.krakens</groupId>
<artifactId>java-grok</artifactId>
<version>0.1.9</version>
<exclusions>
<exclusion>
<groupId>com.google.code</groupId>

View File

@ -77,9 +77,9 @@
<version>2.6</version>
</dependency>
<dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
<version>0.1.5</version>
<groupId>io.krakens</groupId>
<artifactId>java-grok</artifactId>
<version>0.1.9</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>

View File

@ -17,18 +17,45 @@
package org.apache.nifi.grok;
import io.krakens.grok.api.GrokCompiler;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import io.thekraken.grok.api.Grok;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
public class GrokExpressionValidator implements Validator {
private GrokCompiler grokCompiler;
private String patternFileName;
public GrokExpressionValidator(String patternFileName, GrokCompiler compiler) {
this.patternFileName = patternFileName;
this.grokCompiler = compiler;
}
public GrokExpressionValidator() {
this.grokCompiler = GrokCompiler.newInstance();
}
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
try {
new Grok().compile(input);
try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(in);
}
if (patternFileName != null) {
try (final InputStream in = new FileInputStream(new File(patternFileName));
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
}
}
grokCompiler.compile(input);
} catch (final Exception e) {
return new ValidationResult.Builder()
.input(input)

View File

@ -17,23 +17,17 @@
package org.apache.nifi.grok;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.GrokUtils;
import io.krakens.grok.api.exception.GrokException;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
@ -51,9 +45,18 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.GrokUtils;
import io.thekraken.grok.api.exception.GrokException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"})
@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data "
@ -65,12 +68,13 @@ import io.thekraken.grok.api.exception.GrokException;
+ "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). "
+ "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.")
public class GrokReader extends SchemaRegistryService implements RecordReaderFactory {
private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
private volatile boolean appendUnmatchedLine;
private volatile RecordSchema recordSchema;
private volatile RecordSchema recordSchemaFromGrok;
private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
static final AllowableValue APPEND_TO_PREVIOUS_MESSAGE = new AllowableValue("append-to-previous-message", "Append to Previous Message",
"The line of text that does not match the Grok Expression will be appended to the last field of the prior message.");
@ -94,8 +98,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
.name("Grok Expression")
.description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. "
+ "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message.")
.addValidator(new GrokExpressionValidator())
+ "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message."
+ "If other Grok expressions are referenced by this expression, they need to be supplied in the Grok Pattern File")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.build();
@ -120,18 +125,21 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@OnEnabled
public void preCompile(final ConfigurationContext context) throws GrokException, IOException {
grok = new Grok();
grokCompiler = GrokCompiler.newInstance();
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grok.addPatternFromReader(reader);
grokCompiler.register(reader);
}
if (context.getProperty(PATTERN_FILE).isSet()) {
grok.addPatternFromFile(context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue());
try (final InputStream in = new FileInputStream(context.getProperty(PATTERN_FILE)
.evaluateAttributeExpressions().getValue()); final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
}
}
grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
@ -145,6 +153,32 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
ArrayList<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
// validate the grok expression against configuration
GrokCompiler grokCompiler = GrokCompiler.newInstance();
String subject = GROK_EXPRESSION.getName();
String input = validationContext.getProperty(GROK_EXPRESSION).getValue();
GrokExpressionValidator validator;
try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
final Reader reader = new InputStreamReader(in)) {
grokCompiler.register(reader);
} catch (IOException e) {
results.add(new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Unable to load default patterns: " + e.getMessage())
.build());
}
validator = new GrokExpressionValidator(validationContext.getProperty(PATTERN_FILE).getValue(),grokCompiler);
results.add(validator.validate(subject,input,validationContext));
return results;
}
static RecordSchema createRecordSchema(final Grok grok) {
final List<RecordField> fields = new ArrayList<>();
@ -159,10 +193,11 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) {
final Set<String> namedGroups = GrokUtils.getNameGroups(GrokUtils.GROK_PATTERN.pattern());
while (grokExpression.length() > 0) {
final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
if (matcher.find()) {
final Map<String, String> extractedGroups = GrokUtils.namedGroups(matcher, grokExpression);
final Map<String, String> extractedGroups = GrokUtils.namedGroups(matcher, namedGroups);
final String subName = extractedGroups.get("subname");
if (subName == null) {

View File

@ -37,8 +37,8 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.Match;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.Match;
public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
@ -90,8 +90,7 @@ public class GrokRecordReader implements RecordReader {
}
final Match match = grok.match(line);
match.captures();
valueMap = match.toMap();
valueMap = match.capture();
}
if (iterations == 0 && nextLine != null) {
@ -104,8 +103,7 @@ public class GrokRecordReader implements RecordReader {
final StringBuilder trailingText = new StringBuilder();
while ((nextLine = reader.readLine()) != null) {
final Match nextLineMatch = grok.match(nextLine);
nextLineMatch.captures();
final Map<String, Object> nextValueMap = nextLineMatch.toMap();
final Map<String, Object> nextValueMap = nextLineMatch.capture();
if (nextValueMap.isEmpty()) {
// next line did not match. Check if it indicates a Stack Trace. If so, read until
// the stack trace ends. Otherwise, append the next line to the last field in the record.

View File

@ -17,37 +17,49 @@
package org.apache.nifi.grok;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.exception.GrokException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.exception.GrokException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestGrokRecordReader {
private static GrokCompiler grokCompiler;
@BeforeClass
public static void beforeClass() throws Exception {
try (final InputStream fis = new FileInputStream(new File("src/main/resources/default-grok-patterns.txt"))) {
grokCompiler = GrokCompiler.newInstance();
grokCompiler.register(fis);
}
}
@AfterClass
public static void afterClass() throws Exception {
grokCompiler = null;
}
@Test
public void testParseSingleLineLogMessages() throws GrokException, IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/single-line-log-messages.txt"))) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
@ -76,9 +88,7 @@ public class TestGrokRecordReader {
@Test
public void testParseEmptyMessageWithStackTrace() throws GrokException, IOException, MalformedRecordException {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces";
@ -105,12 +115,8 @@ public class TestGrokRecordReader {
@Test
public void testParseNiFiSampleLog() throws IOException, GrokException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample.log"))) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
@ -131,12 +137,8 @@ public class TestGrokRecordReader {
@Test
public void testParseNiFiSampleMultilineWithStackTrace() throws IOException, GrokException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
@ -168,10 +170,7 @@ public class TestGrokRecordReader {
@Test
public void testParseStackTrace() throws GrokException, IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/grok/error-with-stack-trace.log"))) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
@ -217,14 +216,12 @@ public class TestGrokRecordReader {
}
@Test
public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException {
public void testInheritNamedParameters() throws IOException, GrokException, MalformedRecordException {
final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message";
final byte[] msgBytes = syslogMsg.getBytes();
try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
final Grok grok = grokCompiler.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
@ -265,9 +262,7 @@ public class TestGrokRecordReader {
final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
@ -301,9 +296,7 @@ public class TestGrokRecordReader {
final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = new Grok();
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final Grok grok = grokCompiler.compile("%{NUMBER:first} %{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();