NIFI-2861 ControlRate should accept more than one flow file per execution

* Support multiple files per onTrigger call.

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #1412.
This commit is contained in:
Joe Skora 2017-01-12 16:28:34 +00:00 committed by Mike Moser
parent 2fbeabb95f
commit 4d533a99b3
2 changed files with 52 additions and 4 deletions

View File

@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor {
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE, public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration"); "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria") .name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
@ -233,7 +236,7 @@ public class ControlRate extends AbstractProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(new ThrottleFilter()); List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
if (flowFiles.isEmpty()) { if (flowFiles.isEmpty()) {
context.yield(); context.yield();
return; return;
@ -292,11 +295,11 @@ public class ControlRate extends AbstractProcessor {
case ATTRIBUTE_RATE: case ATTRIBUTE_RATE:
final String attributeValue = flowFile.getAttribute(rateControlAttribute); final String attributeValue = flowFile.getAttribute(rateControlAttribute);
if (attributeValue == null) { if (attributeValue == null) {
return -1l; return -1L;
} }
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
return -1l; return -1L;
} }
rateValue = Long.parseLong(attributeValue); rateValue = Long.parseLong(attributeValue);
break; break;
@ -381,6 +384,13 @@ public class ControlRate extends AbstractProcessor {
private class ThrottleFilter implements FlowFileFilter { private class ThrottleFilter implements FlowFileFilter {
private final int flowFilesPerBatch;
private int flowFilesInBatch = 0;
ThrottleFilter(final int maxFFPerBatch) {
flowFilesPerBatch = maxFFPerBatch;
}
@Override @Override
public FlowFileFilterResult filter(FlowFile flowFile) { public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile); long accrual = getFlowFileAccrual(flowFile);
@ -409,7 +419,13 @@ public class ControlRate extends AbstractProcessor {
throttle.lock(); throttle.lock();
try { try {
if (throttle.tryAdd(accrual)) { if (throttle.tryAdd(accrual)) {
flowFilesInBatch += 1;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE; return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
} }
} finally { } finally {
throttle.unlock(); throttle.unlock();

View File

@ -24,6 +24,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
import static org.junit.Assert.assertEquals;
public class TestControlRate { public class TestControlRate {
@Test @Test
@ -175,6 +178,35 @@ public class TestControlRate {
runner.assertQueueEmpty(); runner.assertQueueEmpty();
} }
@Test
public void testBatchLimit() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
final int TEST_FILE_COUNT = 1500;
for (int i = 0; i < TEST_FILE_COUNT; i++) {
runner.enqueue("test data " + i);
}
runner.run(1, false);
// after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());
runner.run(1, false);
// after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
private void createFlowFile(final TestRunner runner, final int value) { private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>(); final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value)); attributeMap.put("count", String.valueOf(value));