diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 006b8eddf7..c73f86696a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -399,8 +399,13 @@ public class ControlRate extends AbstractProcessor { return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } - final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile + String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile .getAttribute(groupingAttributeName); + // the flow file may not have the required attribute: in this case it is considered part + // of the DEFAULT_GROUP_ATTRIBUTE + if (groupName == null) { + groupName = DEFAULT_GROUP_ATTRIBUTE; + } Throttle throttle = throttleMap.get(groupName); if (throttle == null) { throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 050f818654..0260276b49 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -207,6 +207,25 @@ public class TestControlRate { runner.assertQueueEmpty(); } + @Test + public void testNonExistingGroupAttribute() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "2"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); + + createFlowFileWithGroup(runner, "one"); + createFlowFile(runner, 1); // no group set on this flow file + createFlowFileWithGroup(runner, "one"); + createFlowFile(runner, 2); // no group set on this flow file + + runner.run(4, false); + + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); + runner.assertQueueEmpty(); + } + private void createFlowFile(final TestRunner runner, final int value) { final Map attributeMap = new HashMap<>(); attributeMap.put("count", String.valueOf(value));