From a5d630672a7b24ca1f60073182209b998172c0b3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 28 Mar 2017 15:59:02 -0400 Subject: [PATCH] NIFI-3635: This closes #1631. Avoid using a static member variable for the 'Grok' object. Code cleanup Signed-off-by: joewitt --- .../nifi/processors/standard/ExtractGrok.java | 110 +++++++----------- .../processors/standard/TestExtractGrok.java | 16 +-- 2 files changed, 47 insertions(+), 79 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index 116513cc21..2790dc93d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -43,7 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; @@ -65,61 +64,59 @@ import java.util.concurrent.TimeUnit; @Tags({"grok", "log", "text", "parse", "delimit", "extract"}) @CapabilityDescription("Evaluates one or more Grok Expressions against the content of a FlowFile, " + - "adding the results as attributes or replacing the content of the FlowFile with a JSON " + - "notation of the matched content") + "adding the results as attributes or replacing the content of the FlowFile with a JSON " + + "notation of the matched content") @WritesAttributes({ - @WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + - "will be added as an attribute, prefixed with \"grok.\" For example," + - "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")}) + @WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + + "will be added as an attribute, prefixed with \"grok.\" For example," + + "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")}) public class ExtractGrok extends AbstractProcessor { - public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute"; public static final String FLOWFILE_CONTENT = "flowfile-content"; private static final String APPLICATION_JSON = "application/json"; - public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor - .Builder().name("Grok Expression") - .description("Grok expression") - .required(true) - .addValidator(validateGrokExpression()) - .build(); + public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() + .name("Grok Expression") + .description("Grok expression") + .required(true) + .addValidator(validateGrokExpression()) + .build(); - public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor - .Builder().name("Grok Pattern file") - .description("Grok Pattern file definition") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); + public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder() + .name("Grok Pattern file") + .description("Grok Pattern file definition") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() - .name("Destination") - .description("Control if Grok output value is written as a new flowfile attributes, in this case " + - "each of the Grok identifier that is matched in the flowfile will be added as an attribute, " + - "prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " + - "will overwrite any existing flowfile content.") + .name("Destination") + .description("Control if Grok output value is written as a new flowfile attributes, in this case " + + "each of the Grok identifier that is matched in the flowfile will be added as an attribute, " + + "prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " + + "will overwrite any existing flowfile content.") + .required(true) + .allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT) + .defaultValue(FLOWFILE_ATTRIBUTE) + .build(); - .required(true) - .allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT) - .defaultValue(FLOWFILE_ATTRIBUTE) - .build(); + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set in which the file is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); - public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor - .Builder().name("Character Set") - .description("The Character Set in which the file is encoded") - .required(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue("UTF-8") - .build(); - - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor - .Builder().name("Maximum Buffer Size") - .description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE)) - .defaultValue("1 MB") - .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Buffer Size") + .description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE)) + .defaultValue("1 MB") + .build(); public static final Relationship REL_MATCH = new Relationship.Builder() .name("matched") @@ -134,11 +131,9 @@ public class ExtractGrok extends AbstractProcessor { private final static List descriptors; private final static Set relationships; - - private final static Grok grok = Grok.EMPTY; + private volatile Grok grok = new Grok(); private final BlockingQueue bufferQueue = new LinkedBlockingQueue<>(); - static { final Set _relationships = new HashSet<>(); _relationships.add(REL_MATCH); @@ -154,7 +149,6 @@ public class ExtractGrok extends AbstractProcessor { descriptors = Collections.unmodifiableList(_descriptors); } - @Override public Set getRelationships() { return relationships; @@ -165,7 +159,6 @@ public class ExtractGrok extends AbstractProcessor { return descriptors; } - @OnStopped public void onStopped() { bufferQueue.clear(); @@ -173,18 +166,15 @@ public class ExtractGrok extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) throws GrokException { - - for (int i = 0; i < context.getMaxConcurrentTasks(); i++) { final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final byte[] buffer = new byte[maxBufferSize]; bufferQueue.add(buffer); } - + grok = new Grok(); grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue()); grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); - } @Override @@ -216,20 +206,18 @@ public class ExtractGrok extends AbstractProcessor { bufferQueue.offer(buffer); } - final Match gm = grok.match(contentString); gm.captures(); - if (gm.toMap().isEmpty()) { session.transfer(flowFile, REL_NO_MATCH); getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); return; } + final ObjectMapper objectMapper = new ObjectMapper(); switch (context.getProperty(DESTINATION).getValue()) { case FLOWFILE_ATTRIBUTE: - Map grokResults = new HashMap<>(); for (Map.Entry entry : gm.toMap().entrySet()) { if (null != entry.getValue()) { @@ -244,13 +232,10 @@ public class ExtractGrok extends AbstractProcessor { break; case FLOWFILE_CONTENT: - FlowFile conFlowfile = session.write(flowFile, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { - try (OutputStream outputStream = new BufferedOutputStream(out)) { - outputStream.write(objectMapper.writeValueAsBytes(gm.toMap())); - } + out.write(objectMapper.writeValueAsBytes(gm.toMap())); } }); conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); @@ -258,15 +243,12 @@ public class ExtractGrok extends AbstractProcessor { session.transfer(conFlowfile, REL_MATCH); break; - } - } public static final Validator validateGrokExpression() { return new Validator() { - @Override public ValidationResult validate(String subject, String input, ValidationContext context) { @@ -290,10 +272,8 @@ public class ExtractGrok extends AbstractProcessor { } return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } }; } - } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index 580b3080ec..b503c40639 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -31,9 +31,8 @@ import java.nio.file.Paths; public class TestExtractGrok { private TestRunner testRunner; - final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log"); - final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log"); - + private final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log"); + private final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log"); @Before public void init() { @@ -42,8 +41,6 @@ public class TestExtractGrok { @Test public void testExtractGrokWithMatchedContent() throws IOException { - - testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); testRunner.enqueue(GROK_LOG_INPUT); @@ -59,13 +56,10 @@ public class TestExtractGrok { matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 -0800"); matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); matched.assertAttributeEquals("grok.httpversion","1.1"); - } @Test public void testExtractGrokWithUnMatchedContent() throws IOException { - - testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}"); testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); testRunner.enqueue(GROK_TEXT_INPUT); @@ -73,29 +67,23 @@ public class TestExtractGrok { testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH); final MockFlowFile notMatched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_NO_MATCH).get(0); notMatched.assertContentEquals(GROK_TEXT_INPUT); - } @Test public void testExtractGrokWithNotFoundPatternFile() throws IOException { - testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/toto_file"); testRunner.enqueue(GROK_LOG_INPUT); testRunner.assertNotValid(); - } @Test public void testExtractGrokWithBadGrokExpression() throws IOException { - testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO"); testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); testRunner.enqueue(GROK_LOG_INPUT); testRunner.assertNotValid(); - - } }