NIFI-5030: If ControlRate encounters a FlowFile that cannot be transferred, it should continue processing other FlowFiles that have different group attribute values

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2593.
This commit is contained in:
Mark Payne 2018-03-28 16:06:12 -04:00 committed by Pierre Villard
parent c1459825bb
commit bbbe428e7b
2 changed files with 53 additions and 12 deletions

View File

@ -394,18 +394,19 @@ public class ControlRate extends AbstractProcessor {
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile);
if(accrual < 0){
if (accrual < 0) {
// this FlowFile is invalid for this configuration so let the processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile
.getAttribute(groupingAttributeName);
String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
// the flow file may not have the required attribute: in this case it is considered part
// of the DEFAULT_GROUP_ATTRIBUTE
if (groupName == null) {
groupName = DEFAULT_GROUP_ATTRIBUTE;
}
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
@ -436,7 +437,15 @@ public class ControlRate extends AbstractProcessor {
throttle.unlock();
}
return FlowFileFilterResult.REJECT_AND_TERMINATE;
// If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can
// just TERMINATE the iteration over FlowFiles.
// However, if we are using a grouping attribute, then another FlowFile in the queue may be able to proceed,
// so we want to continue our iteration.
if (groupingAttributeName == null) {
return FlowFileFilterResult.REJECT_AND_TERMINATE;
}
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
}

View File

@ -16,19 +16,51 @@
*/
package org.apache.nifi.processors.standard;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestControlRate {
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
// and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles
// whose rate does not need to be throttled.
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
final Map<String, String> group1 = Collections.singletonMap("group", "1");
final Map<String, String> group2 = Collections.singletonMap("group", "2");
for (int i = 0; i < 5; i++) {
runner.enqueue("test data", group1);
}
runner.enqueue("test data", group2);
// Run several times, just to allow the processor to terminate the first poll if it wishes to
runner.run();
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
final List<MockFlowFile> output = runner.getFlowFilesForRelationship(ControlRate.REL_SUCCESS);
assertEquals(3L, output.stream().filter(ff -> ff.getAttribute("group").equals("1")).count());
assertEquals(1L, output.stream().filter(ff -> ff.getAttribute("group").equals("2")).count());
}
@Test
public void testFileCountRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());