From 7206318ecfb3ae973059bb850a2a7d87134bd0c4 Mon Sep 17 00:00:00 2001 From: James Wing Date: Mon, 14 Nov 2016 13:32:31 -0800 Subject: [PATCH] NIFI-3033 GenerateFlowFile Dynamic Properties This closes #1222. Signed-off-by: Koji Kawamura --- .../processors/standard/GenerateFlowFile.java | 33 ++++++++++++++++++- .../standard/TestGenerateFlowFile.java | 18 ++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java index f3a51eff84..b6f06cd697 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java @@ -21,12 +21,15 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -36,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -49,7 +53,11 @@ import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching @Tags({"test", "random", "generate"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing") +@CapabilityDescription("This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful" + + "for load testing, configuration, and simulation.") +@DynamicProperty(name = "Generated FlowFile attribute name", value = "Generated FlowFile attribute value", supportsExpressionLanguage = true, + description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." + + " If Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles.") public class GenerateFlowFile extends AbstractProcessor { private final AtomicReference data = new AtomicReference<>(); @@ -126,6 +134,18 @@ public class GenerateFlowFile extends AbstractProcessor { return descriptors; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + @Override public Set getRelationships() { return relationships; @@ -183,6 +203,16 @@ public class GenerateFlowFile extends AbstractProcessor { data = this.data.get(); } + Map processorProperties = context.getProperties(); + Map generatedAttributes = new HashMap(); + for (final Map.Entry entry : processorProperties.entrySet()) { + PropertyDescriptor property = entry.getKey(); + if (property.isDynamic() && property.isExpressionLanguageSupported()) { + String dynamicValue = context.getProperty(property).evaluateAttributeExpressions().getValue(); + generatedAttributes.put(property.getName(), dynamicValue); + } + } + for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) { FlowFile flowFile = session.create(); if (data.length > 0) { @@ -193,6 +223,7 @@ public class GenerateFlowFile extends AbstractProcessor { } }); } + flowFile = session.putAllAttributes(flowFile, generatedAttributes); session.getProvenanceReporter().create(flowFile); session.transfer(flowFile, SUCCESS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java index d930270d3c..ca7de0c786 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.io.IOException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; @@ -53,4 +54,21 @@ public class TestGenerateFlowFile { runner.assertNotValid(); } + @Test + public void testDynamicPropertiesToAttributes() throws IOException { + TestRunner runner = TestRunners.newTestRunner(new GenerateFlowFile()); + runner.setProperty(GenerateFlowFile.FILE_SIZE, "1B"); + runner.setProperty(GenerateFlowFile.DATA_FORMAT, GenerateFlowFile.DATA_FORMAT_TEXT); + runner.setProperty("plain.dynamic.property", "Plain Value"); + runner.setProperty("expression.dynamic.property", "${literal('Expression Value')}"); + runner.assertValid(); + + runner.run(); + + runner.assertTransferCount(GenerateFlowFile.SUCCESS, 1); + MockFlowFile generatedFlowFile = runner.getFlowFilesForRelationship(GenerateFlowFile.SUCCESS).get(0); + generatedFlowFile.assertAttributeEquals("plain.dynamic.property", "Plain Value"); + generatedFlowFile.assertAttributeEquals("expression.dynamic.property", "Expression Value"); + } + } \ No newline at end of file