diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java new file mode 100644 index 0000000000..aacc3cb20d --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/FlowFileValidator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.flowfile.FlowFile; + +public interface FlowFileValidator { + + /** + * Define a verification method to validate the given FlowFile + * + * @param f Flow file + */ + void assertFlowFile(FlowFile f); + +} diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index ea45dbf825..2b22761104 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -987,6 +987,39 @@ public class MockProcessSession implements ProcessSession { } } + /** + * Asserts that all FlowFiles that were transferred are compliant with the + * given validator. + * + * @param validator validator to use + */ + public void assertAllFlowFiles(FlowFileValidator validator) { + for (final Map.Entry> entry : transferMap.entrySet()) { + final List flowFiles = entry.getValue(); + for (MockFlowFile mockFlowFile : flowFiles) { + validator.assertFlowFile(mockFlowFile); + } + } + } + + /** + * Asserts that all FlowFiles that were transferred in the given relationship + * are compliant with the given validator. + * + * @param validator validator to use + */ + public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) { + for (final Map.Entry> entry : transferMap.entrySet()) { + final List flowFiles = entry.getValue(); + final Relationship rel = entry.getKey(); + for (MockFlowFile mockFlowFile : flowFiles) { + if(rel.equals(relationship)) { + validator.assertFlowFile(mockFlowFile); + } + } + } + } + /** * Removes all state information about FlowFiles that have been transferred */ diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 5aa6d43514..edc1f06532 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -319,6 +319,40 @@ public class StandardProcessorTestRunner implements TestRunner { assertTransferCount(relationship, count); } + @Override + public void assertAllFlowFilesContainAttribute(String attributeName) { + assertAllFlowFiles(new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + Assert.assertTrue(f.getAttribute(attributeName) != null); + } + }); + } + + @Override + public void assertAllFlowFilesContainAttribute(Relationship relationship, String attributeName) { + assertAllFlowFiles(relationship, new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + Assert.assertTrue(f.getAttribute(attributeName) != null); + } + }); + } + + @Override + public void assertAllFlowFiles(FlowFileValidator validator) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFiles(validator); + } + } + + @Override + public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFiles(relationship, validator); + } + } + @Override public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { assertAllFlowFilesTransferred(relationship); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index d1211ef9bb..5832c2e88d 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -272,6 +272,39 @@ public interface TestRunner { */ void assertAllFlowFilesTransferred(Relationship relationship, int count); + /** + * Asserts that all FlowFiles that were transferred contain the given + * attribute. + * + * @param attributeName attribute to look for + */ + void assertAllFlowFilesContainAttribute(String attributeName); + + /** + * Asserts that all FlowFiles that were transferred to the given + * relationship contain the given attribute. + * + * @param relationship relationship to check + * @param attributeName attribute to look for + */ + void assertAllFlowFilesContainAttribute(Relationship relationship, String attributeName); + + /** + * Asserts that all FlowFiles that were transferred are compliant with the + * given validator. + * + * @param validator validator to use + */ + void assertAllFlowFiles(FlowFileValidator validator); + + /** + * Asserts that all FlowFiles that were transferred in the given relationship + * are compliant with the given validator. + * + * @param validator validator to use + */ + void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator); + /** * Assert that the number of FlowFiles transferred to the given relationship * is equal to the given count diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java index 2ac908f895..342b01699c 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -18,10 +18,16 @@ package org.apache.nifi.util; import static org.junit.Assert.assertEquals; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.junit.Ignore; import org.junit.Test; @@ -56,6 +62,55 @@ public class TestStandardProcessorTestRunner { assertEquals(5, runner.getProcessContext().getMaxConcurrentTasks()); } + @Test + public void testFlowFileValidator() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertTransferCount(AddAttributeProcessor.REL_SUCCESS, 3); + runner.assertTransferCount(AddAttributeProcessor.REL_FAILURE, 2); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.REL_SUCCESS, AddAttributeProcessor.KEY); + runner.assertAllFlowFiles(AddAttributeProcessor.REL_SUCCESS, new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY)); + } + }); + } + + @Test(expected = AssertionError.class) + public void testFailFlowFileValidator() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertAllFlowFiles(new FlowFileValidator() { + @Override + public void assertFlowFile(FlowFile f) { + assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY)); + } + }); + } + + @Test(expected = AssertionError.class) + public void testFailAllFlowFilesContainAttribute() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(5, true); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY); + } + + @Test + public void testAllFlowFilesContainAttribute() { + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(1, true); + runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY); + } + @Test(expected = AssertionError.class) @Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods") public void testFailOnDeprecatedTypeAnnotation() { @@ -150,4 +205,38 @@ public class TestStandardProcessorTestRunner { } } + + private static class AddAttributeProcessor extends AbstractProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("failure").build(); + public static final String KEY = "KEY"; + + private Set relationships; + private int counter = 0; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile ff = session.create(); + if(counter % 2 == 0) { + ff = session.putAttribute(ff, KEY, "value"); + session.transfer(ff, REL_SUCCESS); + } else { + session.transfer(ff, REL_FAILURE); + } + counter++; + } + } }