diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 516c8a44b7..9848a3d856 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -290,4 +291,25 @@ public class MockFlowFile implements FlowFileRecord { public long getQueueDateIndex() { return 0; } + public boolean isAttributeEqual(final String attributeName, final String expectedValue) { + // unknown attribute name, so cannot be equal. + if (attributes.containsKey(attributeName) == false) + return false; + + String value = attributes.get(attributeName); + return Objects.equals(expectedValue, value); + } + + public boolean isContentEqual(String expected) { + return isContentEqual(expected, Charset.forName("UTF-8")); + } + + public boolean isContentEqual(String expected, final Charset charset) { + final String value = new String(this.data, charset); + return Objects.equals(expected, value); + } + + public boolean isContentEqual(final byte[] expected) { + return Arrays.equals(expected, this.data); + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 138524df16..2a1451ab6d 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -42,6 +42,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -855,4 +857,38 @@ public class StandardProcessorTestRunner implements TestRunner { return variableRegistry.removeVariable(new VariableDescriptor.Builder(name).build()); } + + /** + * Asserts that all FlowFiles meet all conditions. + * + * @param relationshipName relationship name + * @param predicate conditions + */ + @Override + public void assertAllConditionsMet(final String relationshipName, Predicate predicate) { + assertAllConditionsMet(new Relationship.Builder().name(relationshipName).build(), predicate); + } + + /** + * Asserts that all FlowFiles meet all conditions. + * + * @param relationship relationship + * @param predicate conditions + */ + @Override + public void assertAllConditionsMet(final Relationship relationship, Predicate predicate) { + + if (predicate==null) + Assert.fail("predicate cannot be null"); + + final List flowFiles = getFlowFilesForRelationship(relationship); + + if (flowFiles.isEmpty()) + Assert.fail("Relationship " + relationship.getName() + " does not contain any FlowFile"); + + for (MockFlowFile flowFile : flowFiles) { + if (predicate.test(flowFile)==false) + Assert.fail("FlowFile " + flowFile + " does not meet all condition"); + } + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 78d4d008f8..63b7781cf4 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -932,4 +933,20 @@ public interface TestRunner { * @throws NullPointerException if the name is null */ String removeVariable(String name); + + /** + * Asserts that all FlowFiles meet all conditions. + * + * @param relationshipName relationship name + * @param predicate conditions + */ + void assertAllConditionsMet(final String relationshipName, Predicate predicate); + + /** + * Asserts that all FlowFiles meet all conditions. + * + * @param relationship relationship + * @param predicate conditions + */ + void assertAllConditionsMet(final Relationship relationship, Predicate predicate); } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java index c65a7ba123..c5776d90ed 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -20,8 +20,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.function.Predicate; + import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -53,6 +57,45 @@ public class TestStandardProcessorTestRunner { assertEquals(1, proc.getOnStoppedCallsWithoutContext()); } + @Test + public void testAllConditionsMet() { + TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor()); + + final Map attributes = new HashMap<>(); + attributes.put("GROUP_ATTRIBUTE_KEY", "1"); + attributes.put("KeyB", "hihii"); + runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1); + + runner.assertAllConditionsMet("success", + mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && mff.isContentEqual("1,hello\n1,good-bye") + ); + } + + @Test + public void testAllConditionsMetComplex() { + TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor()); + + final Map attributes = new HashMap<>(); + attributes.put("GROUP_ATTRIBUTE_KEY", "1"); + attributes.put("KeyB", "hihii"); + runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes); + + attributes.clear(); + attributes.put("age", "34"); + runner.enqueue("May Andersson".getBytes(), attributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 2); + + Predicate firstPredicate = mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1"); + Predicate either = firstPredicate.or(mff -> mff.isAttributeEqual("age", "34")); + + runner.assertAllConditionsMet("success", either); + } + @Test public void testNumThreads() { final ProcessorWithOnStop proc = new ProcessorWithOnStop(); @@ -186,4 +229,39 @@ public class TestStandardProcessorTestRunner { counter++; } } + + private static class GoodProcessor extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from ...") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("... execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); + + private final Set relationships; + + public GoodProcessor() { + final Set r = new HashSet<>(); + r.add(REL_SUCCESS); + r.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(r); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + for( FlowFile incoming : session.get(20)) { + session.transfer(incoming, REL_SUCCESS); + } + } + } }