NIFI-10835 Improved performance of TestControlRate

- Added LongSupplier for TimedBuffer and ControlRate classes to support overriding System.currentTimeMillis()

This closes #6671
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2022-11-16 16:03:53 -06:00 committed by Paul Grey
parent 54108689b6
commit 7019e182b5
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
3 changed files with 116 additions and 93 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.util.timebuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
public class TimedBuffer<T> {
@ -25,20 +26,26 @@ public class TimedBuffer<T> {
private final EntitySum<T>[] bins;
private final EntityAccess<T> entityAccess;
private final TimeUnit binPrecision;
private final LongSupplier currentTimeSupplier;
public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
this(binPrecision, numBins, accessor, System::currentTimeMillis);
}
@SuppressWarnings("unchecked")
public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor, final LongSupplier currentTimeSupplier) {
this.binPrecision = binPrecision;
this.numBins = numBins + 1;
this.bins = new EntitySum[this.numBins];
for (int i = 0; i < this.numBins; i++) {
this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor, currentTimeSupplier);
}
this.entityAccess = accessor;
this.currentTimeSupplier = currentTimeSupplier;
}
public T add(final T entity) {
final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
final int binIdx = (int) (binPrecision.convert(currentTimeSupplier.getAsLong(), TimeUnit.MILLISECONDS) % numBins);
final EntitySum<T> sum = bins[binIdx];
return sum.addOrReset(entity);
@ -66,11 +73,13 @@ public class TimedBuffer<T> {
private final AtomicReference<S> ref = new AtomicReference<>();
private final TimeUnit binPrecision;
private final int numConfiguredBins;
private final LongSupplier currentTimeSupplier;
public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator, final LongSupplier currentTimeSupplier) {
this.binPrecision = binPrecision;
this.entityAccess = aggregator;
this.numConfiguredBins = numConfiguredBins;
this.currentTimeSupplier = currentTimeSupplier;
}
private S add(final S event) {
@ -92,7 +101,7 @@ public class TimedBuffer<T> {
// entityAccess.getTimestamp(curValue) represents the time at which the current value
// was last updated. If the last value is less than current time - 1 binPrecision, then it
// means that we've rolled over and need to reset the value.
final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
final S curValue = ref.get();
return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
@ -102,7 +111,7 @@ public class TimedBuffer<T> {
// entityAccess.getTimestamp(curValue) represents the time at which the current value
// was last updated. If the last value is less than current time - 1 binPrecision, then it
// means that we've rolled over and need to reset the value.
final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
final S curValue = ref.get();
if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {

View File

@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongSupplier;
import java.util.regex.Pattern;
@SideEffectFree
@ -165,7 +166,7 @@ public class ControlRate extends AbstractProcessor {
private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong lastThrottleClearTime = new AtomicLong(getCurrentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
@ -299,7 +300,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis));
if (flowFiles.isEmpty()) {
context.yield();
return;
@ -307,9 +308,9 @@ public class ControlRate extends AbstractProcessor {
// Periodically clear any Throttle that has not been used in more than 2 throttling periods
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, getCurrentTimeMillis())) {
final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
if (dataThrottleRequired()) {
throttleSet.addAll(dataThrottleMap.entrySet());
@ -337,16 +338,25 @@ public class ControlRate extends AbstractProcessor {
final ComponentLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
// call this to capture potential error
if (!isAccrualPossible(flowFile)) {
logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
if (isAccrualPossible(flowFile)) {
logger.info("transferring {} to 'success'", flowFile);
session.transfer(flowFile, REL_SUCCESS);
} else {
logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}
/**
* Get current time in milliseconds
*
* @return Current time in milliseconds from System
*/
protected long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
/*
* Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
* flowfile attribute, the specified attribute must be present and must be a long integer.
@ -404,15 +414,17 @@ public class ControlRate extends AbstractProcessor {
private final long timePeriodMillis;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ComponentLog logger;
private final LongSupplier currentTimeSupplier;
private volatile long penalizationPeriod = 0;
private volatile long penalizationExpired = 0;
private volatile long lastUpdateTime;
public Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger) {
private Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger, final LongSupplier currentTimeSupplier) {
this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess());
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess(), currentTimeSupplier);
this.logger = logger;
this.currentTimeSupplier = currentTimeSupplier;
}
public void setMaxRate(final long maxRate) {
@ -428,7 +440,7 @@ public class ControlRate extends AbstractProcessor {
if (value < 0) {
return false;
}
final long now = System.currentTimeMillis();
final long now = currentTimeSupplier.getAsLong();
if (penalizationExpired > now) {
return false;
}
@ -478,10 +490,12 @@ public class ControlRate extends AbstractProcessor {
private class ThrottleFilter implements FlowFileFilter {
private final int flowFilesPerBatch;
private final LongSupplier currentTimeSupplier;
private int flowFilesInBatch = 0;
ThrottleFilter(final int maxFFPerBatch) {
flowFilesPerBatch = maxFFPerBatch;
ThrottleFilter(final int maxFFPerBatch, final LongSupplier currentTimeSupplier) {
this.flowFilesPerBatch = maxFFPerBatch;
this.currentTimeSupplier = currentTimeSupplier;
}
@Override
@ -505,7 +519,7 @@ public class ControlRate extends AbstractProcessor {
boolean dataThrottlingActive = false;
if (dataThrottleRequired()) {
if (dataThrottle == null) {
dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
dataThrottleMap.put(groupName, dataThrottle);
}
@ -534,7 +548,7 @@ public class ControlRate extends AbstractProcessor {
// continue processing count throttle only if required and if data throttle is not already limiting flowfiles
if (countThrottleRequired() && !dataThrottlingActive) {
if (countThrottle == null) {
countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
countThrottleMap.put(groupName, countThrottle);
}

View File

@ -27,18 +27,30 @@ import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestControlRate {
private static final long ONE_SEC_PLUS = 1010L;
private static final String ONE_SECOND_TIME_PERIOD = "1 s";
private static final long CURRENT_TIME_INCREMENT = 1100;
private ConfigurableControlRate controlRate;
private TestRunner runner;
@BeforeEach
public void setRunner() {
controlRate = new ConfigurableControlRate();
runner = TestRunners.newTestRunner(controlRate);
}
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
// and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles
// whose rate does not need to be throttled.
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
@ -64,11 +76,10 @@ public class TestControlRate {
}
@Test
public void testFileCountRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testFileCountRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@ -86,18 +97,18 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 3 files and after 1 second, we should be able to send the 4th
Thread.sleep(ONE_SEC_PLUS);
runner.run();
incrementCurrentTime();
runner.run(5);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
public void testFileCountWithGrouping() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testFileCountWithGrouping() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
@ -118,18 +129,17 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
Thread.sleep(ONE_SEC_PLUS);
incrementCurrentTime();
runner.run(2);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
public void testDataSizeRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testDataSizeRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "20 b");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("testdata 1");
runner.enqueue("testdata 2");
@ -147,19 +157,18 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
Thread.sleep(ONE_SEC_PLUS);
incrementCurrentTime();
runner.run(2, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
public void testViaAttribute() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testViaAttribute() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
@ -178,14 +187,14 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
Thread.sleep(1200L);
incrementCurrentTime(1450);
// at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
Thread.sleep(600L);
incrementCurrentTime(600);
// at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000
runner.run();
@ -195,12 +204,11 @@ public class TestControlRate {
}
@Test
public void testAttributeDoesNotExist() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testAttributeDoesNotExist() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
@ -218,11 +226,10 @@ public class TestControlRate {
@Test
public void testBadAttributeRate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", "bad string");
@ -236,10 +243,9 @@ public class TestControlRate {
@Test
public void testBatchLimit() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
final int TEST_FILE_COUNT = 1500;
@ -265,10 +271,9 @@ public class TestControlRate {
@Test
public void testNonExistingGroupAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
@ -283,11 +288,10 @@ public class TestControlRate {
}
@Test
public void testIncreaseDataRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testIncreaseDataRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "11 B");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@ -310,7 +314,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
Thread.sleep(ONE_SEC_PLUS);
incrementCurrentTime();
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@ -318,11 +322,10 @@ public class TestControlRate {
}
@Test
public void testIncreaseFlowFileRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testIncreaseFlowFileRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@ -345,7 +348,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
Thread.sleep(ONE_SEC_PLUS);
incrementCurrentTime();
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@ -353,10 +356,9 @@ public class TestControlRate {
}
@Test
public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testDataOrFlowFileCountLimitedByBytes() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
@ -377,17 +379,16 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 22 bytes and after 1 second, we should be able to send 22 more
Thread.sleep(ONE_SEC_PLUS);
incrementCurrentTime(1500);
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testDataOrFlowFileCountLimitedByCount() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// FlowFile count rate will throttle before data rate
runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second
@ -396,32 +397,23 @@ public class TestControlRate {
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.run(4, false);
runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 1 flowfile and after 1 second, we should be able to send 1 more
Thread.sleep(ONE_SEC_PLUS);
runner.run(4, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 2 flowfile and after 1 second, we should be able to send 1 more
Thread.sleep(ONE_SEC_PLUS);
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
incrementCurrentTime(2000);
runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
public void testDataOrFlowFileCountLimitedByBytesThenCount() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
@ -440,27 +432,17 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
runner.clearTransferState();
// we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
Thread.sleep(ONE_SEC_PLUS);
runner.run(10, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
runner.clearTransferState();
// after 1 second, we should be able to send the remaining flowfile
Thread.sleep(ONE_SEC_PLUS);
runner.run(10, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
incrementCurrentTime(1500);
runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 8);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testValidate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.assertNotValid(); // MAX_RATE is not set
runner.setProperty(ControlRate.MAX_RATE, "1");
@ -527,4 +509,22 @@ public class TestControlRate {
attributeMap.put("group", group);
runner.enqueue(new byte[0], attributeMap);
}
private void incrementCurrentTime() {
controlRate.currentTimeMillis += CURRENT_TIME_INCREMENT;
}
private void incrementCurrentTime(final long milliseconds) {
controlRate.currentTimeMillis += milliseconds;
}
private static class ConfigurableControlRate extends ControlRate {
private long currentTimeMillis = System.currentTimeMillis();
@Override
protected long getCurrentTimeMillis() {
return currentTimeMillis;
}
}
}