From bbbe428e7bf6f0a6cc7687cbd750431ae81fad8f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 28 Mar 2018 16:06:12 -0400 Subject: [PATCH] NIFI-5030: If ControlRate encounters a FlowFile that cannot be transferred, it should continue processing other FlowFiles that have different group attribute values Signed-off-by: Pierre Villard This closes #2593. --- .../nifi/processors/standard/ControlRate.java | 17 +++++-- .../processors/standard/TestControlRate.java | 48 +++++++++++++++---- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index c73f86696a..cf2364adcc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -394,18 +394,19 @@ public class ControlRate extends AbstractProcessor { @Override public FlowFileFilterResult filter(FlowFile flowFile) { long accrual = getFlowFileAccrual(flowFile); - if(accrual < 0){ + if (accrual < 0) { // this FlowFile is invalid for this configuration so let the processor deal with it return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } - String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile - .getAttribute(groupingAttributeName); + String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); + // the flow file may not have the required attribute: in this case it is considered part // of the DEFAULT_GROUP_ATTRIBUTE if (groupName == null) { groupName = DEFAULT_GROUP_ATTRIBUTE; } + Throttle throttle = throttleMap.get(groupName); if (throttle == null) { throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); @@ -436,7 +437,15 @@ public class ControlRate extends AbstractProcessor { throttle.unlock(); } - return FlowFileFilterResult.REJECT_AND_TERMINATE; + // If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can + // just TERMINATE the iteration over FlowFiles. + // However, if we are using a grouping attribute, then another FlowFile in the queue may be able to proceed, + // so we want to continue our iteration. + if (groupingAttributeName == null) { + return FlowFileFilterResult.REJECT_AND_TERMINATE; + } + + return FlowFileFilterResult.REJECT_AND_CONTINUE; } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 0260276b49..dea87d97cf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -16,19 +16,51 @@ */ package org.apache.nifi.processors.standard; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import org.junit.Test; - import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; import static org.junit.Assert.assertEquals; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + public class TestControlRate { + @Test + public void testLimitExceededThenOtherLimitNotExceeded() { + // If we have flowfiles queued that have different values for the "Rate Controlled Attribute" + // and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles + // whose rate does not need to be throttled. + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "3"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 min"); + runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); + + final Map group1 = Collections.singletonMap("group", "1"); + final Map group2 = Collections.singletonMap("group", "2"); + + for (int i = 0; i < 5; i++) { + runner.enqueue("test data", group1); + } + + runner.enqueue("test data", group2); + + // Run several times, just to allow the processor to terminate the first poll if it wishes to + runner.run(); + + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); + + final List output = runner.getFlowFilesForRelationship(ControlRate.REL_SUCCESS); + assertEquals(3L, output.stream().filter(ff -> ff.getAttribute("group").equals("1")).count()); + assertEquals(1L, output.stream().filter(ff -> ff.getAttribute("group").equals("2")).count()); + } + @Test public void testFileCountRate() throws InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new ControlRate());