NIFI-4629: This closes #2345. Put flowfiles without the grouping attribute in the default group

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Marco Gaido 2017-12-15 12:59:16 +01:00 committed by joewitt
parent 15eeb22116
commit 463dcd8812
2 changed files with 25 additions and 1 deletions

View File

@ -399,8 +399,13 @@ public class ControlRate extends AbstractProcessor {
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile
String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile
.getAttribute(groupingAttributeName);
// the flow file may not have the required attribute: in this case it is considered part
// of the DEFAULT_GROUP_ATTRIBUTE
if (groupName == null) {
groupName = DEFAULT_GROUP_ATTRIBUTE;
}
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());

View File

@ -207,6 +207,25 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
@Test
public void testNonExistingGroupAttribute() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
createFlowFile(runner, 1); // no group set on this flow file
createFlowFileWithGroup(runner, "one");
createFlowFile(runner, 2); // no group set on this flow file
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
runner.assertQueueEmpty();
}
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));