NIFI-3635: This closes #1631. Avoid using a static member variable for the 'Grok' object. Code cleanup

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-03-28 15:59:02 -04:00 committed by joewitt
parent 35f4f48f37
commit a5d630672a
2 changed files with 47 additions and 79 deletions

View File

@ -43,7 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
@ -65,61 +64,59 @@ import java.util.concurrent.TimeUnit;
@Tags({"grok", "log", "text", "parse", "delimit", "extract"})
@CapabilityDescription("Evaluates one or more Grok Expressions against the content of a FlowFile, " +
"adding the results as attributes or replacing the content of the FlowFile with a JSON " +
"notation of the matched content")
"adding the results as attributes or replacing the content of the FlowFile with a JSON " +
"notation of the matched content")
@WritesAttributes({
@WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " +
"will be added as an attribute, prefixed with \"grok.\" For example," +
"if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")})
@WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " +
"will be added as an attribute, prefixed with \"grok.\" For example," +
"if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")})
public class ExtractGrok extends AbstractProcessor {
public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute";
public static final String FLOWFILE_CONTENT = "flowfile-content";
private static final String APPLICATION_JSON = "application/json";
public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor
.Builder().name("Grok Expression")
.description("Grok expression")
.required(true)
.addValidator(validateGrokExpression())
.build();
public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
.name("Grok Expression")
.description("Grok expression")
.required(true)
.addValidator(validateGrokExpression())
.build();
public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor
.Builder().name("Grok Pattern file")
.description("Grok Pattern file definition")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder()
.name("Grok Pattern file")
.description("Grok Pattern file definition")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Control if Grok output value is written as a new flowfile attributes, in this case " +
"each of the Grok identifier that is matched in the flowfile will be added as an attribute, " +
"prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " +
"will overwrite any existing flowfile content.")
.name("Destination")
.description("Control if Grok output value is written as a new flowfile attributes, in this case " +
"each of the Grok identifier that is matched in the flowfile will be added as an attribute, " +
"prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " +
"will overwrite any existing flowfile content.")
.required(true)
.allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT)
.defaultValue(FLOWFILE_ATTRIBUTE)
.build();
.required(true)
.allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT)
.defaultValue(FLOWFILE_ATTRIBUTE)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the file is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor
.Builder().name("Character Set")
.description("The Character Set in which the file is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor
.Builder().name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE))
.defaultValue("1 MB")
.build();
public static final Relationship REL_MATCH = new Relationship.Builder()
.name("matched")
@ -134,11 +131,9 @@ public class ExtractGrok extends AbstractProcessor {
private final static List<PropertyDescriptor> descriptors;
private final static Set<Relationship> relationships;
private final static Grok grok = Grok.EMPTY;
private volatile Grok grok = new Grok();
private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
static {
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_MATCH);
@ -154,7 +149,6 @@ public class ExtractGrok extends AbstractProcessor {
descriptors = Collections.unmodifiableList(_descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
@ -165,7 +159,6 @@ public class ExtractGrok extends AbstractProcessor {
return descriptors;
}
@OnStopped
public void onStopped() {
bufferQueue.clear();
@ -173,18 +166,15 @@ public class ExtractGrok extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws GrokException {
for (int i = 0; i < context.getMaxConcurrentTasks(); i++) {
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final byte[] buffer = new byte[maxBufferSize];
bufferQueue.add(buffer);
}
grok = new Grok();
grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue());
grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
}
@Override
@ -216,20 +206,18 @@ public class ExtractGrok extends AbstractProcessor {
bufferQueue.offer(buffer);
}
final Match gm = grok.match(contentString);
gm.captures();
if (gm.toMap().isEmpty()) {
session.transfer(flowFile, REL_NO_MATCH);
getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
return;
}
final ObjectMapper objectMapper = new ObjectMapper();
switch (context.getProperty(DESTINATION).getValue()) {
case FLOWFILE_ATTRIBUTE:
Map<String, String> grokResults = new HashMap<>();
for (Map.Entry<String, Object> entry : gm.toMap().entrySet()) {
if (null != entry.getValue()) {
@ -244,13 +232,10 @@ public class ExtractGrok extends AbstractProcessor {
break;
case FLOWFILE_CONTENT:
FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(gm.toMap()));
}
out.write(objectMapper.writeValueAsBytes(gm.toMap()));
}
});
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
@ -258,15 +243,12 @@ public class ExtractGrok extends AbstractProcessor {
session.transfer(conFlowfile, REL_MATCH);
break;
}
}
public static final Validator validateGrokExpression() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
@ -290,10 +272,8 @@ public class ExtractGrok extends AbstractProcessor {
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
}

View File

@ -31,9 +31,8 @@ import java.nio.file.Paths;
public class TestExtractGrok {
private TestRunner testRunner;
final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log");
final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log");
private final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log");
private final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log");
@Before
public void init() {
@ -42,8 +41,6 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
@ -59,13 +56,10 @@ public class TestExtractGrok {
matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 -0800");
matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables");
matched.assertAttributeEquals("grok.httpversion","1.1");
}
@Test
public void testExtractGrokWithUnMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_TEXT_INPUT);
@ -73,29 +67,23 @@ public class TestExtractGrok {
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH);
final MockFlowFile notMatched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_NO_MATCH).get(0);
notMatched.assertContentEquals(GROK_TEXT_INPUT);
}
@Test
public void testExtractGrokWithNotFoundPatternFile() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/toto_file");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@Test
public void testExtractGrokWithBadGrokExpression() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO");
testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
}