NIFI-3033 GenerateFlowFile Dynamic Properties

This closes #1222.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
James Wing 2016-11-14 13:32:31 -08:00 committed by Koji Kawamura
parent 06d7ecd324
commit 7206318ecf
2 changed files with 50 additions and 1 deletions

View File

@ -21,12 +21,15 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -36,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -49,7 +53,11 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching @SupportsBatching
@Tags({"test", "random", "generate"}) @Tags({"test", "random", "generate"})
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing") @CapabilityDescription("This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful" +
"for load testing, configuration, and simulation.")
@DynamicProperty(name = "Generated FlowFile attribute name", value = "Generated FlowFile attribute value", supportsExpressionLanguage = true,
description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." +
" If Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles.")
public class GenerateFlowFile extends AbstractProcessor { public class GenerateFlowFile extends AbstractProcessor {
private final AtomicReference<byte[]> data = new AtomicReference<>(); private final AtomicReference<byte[]> data = new AtomicReference<>();
@ -126,6 +134,18 @@ public class GenerateFlowFile extends AbstractProcessor {
return descriptors; return descriptors;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return relationships;
@ -183,6 +203,16 @@ public class GenerateFlowFile extends AbstractProcessor {
data = this.data.get(); data = this.data.get();
} }
Map<PropertyDescriptor, String> processorProperties = context.getProperties();
Map<String, String> generatedAttributes = new HashMap<String, String>();
for (final Map.Entry<PropertyDescriptor, String> entry : processorProperties.entrySet()) {
PropertyDescriptor property = entry.getKey();
if (property.isDynamic() && property.isExpressionLanguageSupported()) {
String dynamicValue = context.getProperty(property).evaluateAttributeExpressions().getValue();
generatedAttributes.put(property.getName(), dynamicValue);
}
}
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) { for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
if (data.length > 0) { if (data.length > 0) {
@ -193,6 +223,7 @@ public class GenerateFlowFile extends AbstractProcessor {
} }
}); });
} }
flowFile = session.putAllAttributes(flowFile, generatedAttributes);
session.getProvenanceReporter().create(flowFile); session.getProvenanceReporter().create(flowFile);
session.transfer(flowFile, SUCCESS); session.transfer(flowFile, SUCCESS);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
@ -53,4 +54,21 @@ public class TestGenerateFlowFile {
runner.assertNotValid(); runner.assertNotValid();
} }
@Test
public void testDynamicPropertiesToAttributes() throws IOException {
TestRunner runner = TestRunners.newTestRunner(new GenerateFlowFile());
runner.setProperty(GenerateFlowFile.FILE_SIZE, "1B");
runner.setProperty(GenerateFlowFile.DATA_FORMAT, GenerateFlowFile.DATA_FORMAT_TEXT);
runner.setProperty("plain.dynamic.property", "Plain Value");
runner.setProperty("expression.dynamic.property", "${literal('Expression Value')}");
runner.assertValid();
runner.run();
runner.assertTransferCount(GenerateFlowFile.SUCCESS, 1);
MockFlowFile generatedFlowFile = runner.getFlowFilesForRelationship(GenerateFlowFile.SUCCESS).get(0);
generatedFlowFile.assertAttributeEquals("plain.dynamic.property", "Plain Value");
generatedFlowFile.assertAttributeEquals("expression.dynamic.property", "Expression Value");
}
} }