NIFI-5454: Added EL support and copy.index attribute to DuplicateFlowFile

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2917.
This commit is contained in:
Matthew Burgess 2018-07-25 13:55:10 -04:00 committed by Pierre Villard
parent 0ad30e188f
commit 692943f016
2 changed files with 45 additions and 6 deletions

View File

@ -24,9 +24,12 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -39,14 +42,22 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@Tags({"test", "load", "duplicate"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile")
@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile. The original FlowFile as well as all "
+ "generated copies are sent to the 'success' relationship. In addition, each FlowFile gets an attribute 'copy.index' set to the copy number, where the original FlowFile gets "
+ "a value of zero, and all copies receive incremented integer values.")
@WritesAttributes({
@WritesAttribute(attribute = "copy.index", description = "A zero-based incrementing integer value based on which copy the FlowFile is.")
})
public class DuplicateFlowFile extends AbstractProcessor {
public static final String COPY_INDEX_ATTRIBUTE = "copy.index";
static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder()
.name("Number of Copies")
.displayName("Number of Copies")
.description("Specifies how many copies of each incoming FlowFile will be made")
.required(true)
.expressionLanguageSupported(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("100")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
@ -68,16 +79,18 @@ public class DuplicateFlowFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) {
final FlowFile copy = session.clone(flowFile);
for (int i = 1; i <= context.getProperty(NUM_COPIES).evaluateAttributeExpressions(flowFile).asInteger(); i++) {
FlowFile copy = session.clone(flowFile);
copy = session.putAttribute(copy, COPY_INDEX_ATTRIBUTE, Integer.toString(i));
session.transfer(copy, REL_SUCCESS);
}
flowFile = session.putAttribute(flowFile, COPY_INDEX_ATTRIBUTE, "0");
session.transfer(flowFile, REL_SUCCESS);
}

View File

@ -16,20 +16,46 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import static org.apache.nifi.processors.standard.DuplicateFlowFile.COPY_INDEX_ATTRIBUTE;
public class TestDuplicateFlowFile {
@Test
public void test() {
final int numCopies = 100;
final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class);
runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100");
runner.setProperty(DuplicateFlowFile.NUM_COPIES, Integer.toString(numCopies));
runner.enqueue("hello".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, numCopies + 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DuplicateFlowFile.REL_SUCCESS);
// copy.index starts with 1, original has copy.index = 0 but is transferred last
for (int i = 1; i <= numCopies; i++) {
flowFiles.get(i - 1).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, Integer.toString(i));
}
flowFiles.get(numCopies).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, "0");
}
@Test
public void testNumberOfCopiesEL() {
final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class);
runner.setProperty(DuplicateFlowFile.NUM_COPIES, "${num.copies}");
runner.enqueue("hello".getBytes(), new HashMap<String, String>() {{
put("num.copies", "100");
}});
runner.run();
runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101);
}