nifi-1214d Mock Framework should allow order-independent assumptions on FlowFiles.

This closes #1033

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Toivo Adams 2016-09-20 11:35:07 +03:00 committed by jpercivall
parent b693a4a561
commit ad3d63d204
4 changed files with 153 additions and 0 deletions

View File

@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -290,4 +291,25 @@ public class MockFlowFile implements FlowFileRecord {
public long getQueueDateIndex() { public long getQueueDateIndex() {
return 0; return 0;
} }
public boolean isAttributeEqual(final String attributeName, final String expectedValue) {
// unknown attribute name, so cannot be equal.
if (attributes.containsKey(attributeName) == false)
return false;
String value = attributes.get(attributeName);
return Objects.equals(expectedValue, value);
}
public boolean isContentEqual(String expected) {
return isContentEqual(expected, Charset.forName("UTF-8"));
}
public boolean isContentEqual(String expected, final Charset charset) {
final String value = new String(this.data, charset);
return Objects.equals(expected, value);
}
public boolean isContentEqual(final byte[] expected) {
return Arrays.equals(expected, this.data);
}
} }

View File

@ -42,6 +42,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -855,4 +857,38 @@ public class StandardProcessorTestRunner implements TestRunner {
return variableRegistry.removeVariable(new VariableDescriptor.Builder(name).build()); return variableRegistry.removeVariable(new VariableDescriptor.Builder(name).build());
} }
/**
* Asserts that all FlowFiles meet all conditions.
*
* @param relationshipName relationship name
* @param predicate conditions
*/
@Override
public void assertAllConditionsMet(final String relationshipName, Predicate<MockFlowFile> predicate) {
assertAllConditionsMet(new Relationship.Builder().name(relationshipName).build(), predicate);
}
/**
* Asserts that all FlowFiles meet all conditions.
*
* @param relationship relationship
* @param predicate conditions
*/
@Override
public void assertAllConditionsMet(final Relationship relationship, Predicate<MockFlowFile> predicate) {
if (predicate==null)
Assert.fail("predicate cannot be null");
final List<MockFlowFile> flowFiles = getFlowFilesForRelationship(relationship);
if (flowFiles.isEmpty())
Assert.fail("Relationship " + relationship.getName() + " does not contain any FlowFile");
for (MockFlowFile flowFile : flowFiles) {
if (predicate.test(flowFile)==false)
Assert.fail("FlowFile " + flowFile + " does not meet all condition");
}
}
} }

View File

@ -21,6 +21,7 @@ import java.io.InputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -932,4 +933,20 @@ public interface TestRunner {
* @throws NullPointerException if the name is null * @throws NullPointerException if the name is null
*/ */
String removeVariable(String name); String removeVariable(String name);
/**
* Asserts that all FlowFiles meet all conditions.
*
* @param relationshipName relationship name
* @param predicate conditions
*/
void assertAllConditionsMet(final String relationshipName, Predicate<MockFlowFile> predicate);
/**
* Asserts that all FlowFiles meet all conditions.
*
* @param relationship relationship
* @param predicate conditions
*/
void assertAllConditionsMet(final Relationship relationship, Predicate<MockFlowFile> predicate);
} }

View File

@ -20,8 +20,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -53,6 +57,45 @@ public class TestStandardProcessorTestRunner {
assertEquals(1, proc.getOnStoppedCallsWithoutContext()); assertEquals(1, proc.getOnStoppedCallsWithoutContext());
} }
@Test
public void testAllConditionsMet() {
TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor());
final Map<String, String> attributes = new HashMap<>();
attributes.put("GROUP_ATTRIBUTE_KEY", "1");
attributes.put("KeyB", "hihii");
runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1);
runner.assertAllConditionsMet("success",
mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && mff.isContentEqual("1,hello\n1,good-bye")
);
}
@Test
public void testAllConditionsMetComplex() {
TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor());
final Map<String, String> attributes = new HashMap<>();
attributes.put("GROUP_ATTRIBUTE_KEY", "1");
attributes.put("KeyB", "hihii");
runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);
attributes.clear();
attributes.put("age", "34");
runner.enqueue("May Andersson".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 2);
Predicate<MockFlowFile> firstPredicate = mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1");
Predicate<MockFlowFile> either = firstPredicate.or(mff -> mff.isAttributeEqual("age", "34"));
runner.assertAllConditionsMet("success", either);
}
@Test @Test
public void testNumThreads() { public void testNumThreads() {
final ProcessorWithOnStop proc = new ProcessorWithOnStop(); final ProcessorWithOnStop proc = new ProcessorWithOnStop();
@ -186,4 +229,39 @@ public class TestStandardProcessorTestRunner {
counter++; counter++;
} }
} }
private static class GoodProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully created FlowFile from ...")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("... execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
public GoodProcessor() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
r.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(r);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
for( FlowFile incoming : session.get(20)) {
session.transfer(incoming, REL_SUCCESS);
}
}
}
} }