From 7019e182b5b0e36cc46ad37456f435b2a84269d5 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 16 Nov 2022 16:03:53 -0600 Subject: [PATCH] NIFI-10835 Improved performance of TestControlRate - Added LongSupplier for TimedBuffer and ControlRate classes to support overriding System.currentTimeMillis() This closes #6671 Signed-off-by: Paul Grey --- .../nifi/util/timebuffer/TimedBuffer.java | 21 ++- .../nifi/processors/standard/ControlRate.java | 44 ++++-- .../processors/standard/TestControlRate.java | 144 +++++++++--------- 3 files changed, 116 insertions(+), 93 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java index dd8e5232c4..af241a525f 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java @@ -18,6 +18,7 @@ package org.apache.nifi.util.timebuffer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; public class TimedBuffer { @@ -25,20 +26,26 @@ public class TimedBuffer { private final EntitySum[] bins; private final EntityAccess entityAccess; private final TimeUnit binPrecision; + private final LongSupplier currentTimeSupplier; + + public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess accessor) { + this(binPrecision, numBins, accessor, System::currentTimeMillis); + } @SuppressWarnings("unchecked") - public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess accessor) { + public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess accessor, final LongSupplier currentTimeSupplier) { this.binPrecision = binPrecision; this.numBins = numBins + 1; this.bins = new EntitySum[this.numBins]; for (int i = 0; i < this.numBins; i++) { - this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor); + this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor, currentTimeSupplier); } this.entityAccess = accessor; + this.currentTimeSupplier = currentTimeSupplier; } public T add(final T entity) { - final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins); + final int binIdx = (int) (binPrecision.convert(currentTimeSupplier.getAsLong(), TimeUnit.MILLISECONDS) % numBins); final EntitySum sum = bins[binIdx]; return sum.addOrReset(entity); @@ -66,11 +73,13 @@ public class TimedBuffer { private final AtomicReference ref = new AtomicReference<>(); private final TimeUnit binPrecision; private final int numConfiguredBins; + private final LongSupplier currentTimeSupplier; - public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess aggregator) { + public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess aggregator, final LongSupplier currentTimeSupplier) { this.binPrecision = binPrecision; this.entityAccess = aggregator; this.numConfiguredBins = numConfiguredBins; + this.currentTimeSupplier = currentTimeSupplier; } private S add(final S event) { @@ -92,7 +101,7 @@ public class TimedBuffer { // entityAccess.getTimestamp(curValue) represents the time at which the current value // was last updated. If the last value is less than current time - 1 binPrecision, then it // means that we've rolled over and need to reset the value. - final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision); + final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision); final S curValue = ref.get(); return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod); @@ -102,7 +111,7 @@ public class TimedBuffer { // entityAccess.getTimestamp(curValue) represents the time at which the current value // was last updated. If the last value is less than current time - 1 binPrecision, then it // means that we've rolled over and need to reset the value. - final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision); + final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(1, binPrecision); final S curValue = ref.get(); if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 34b9a8144b..ef62eee94f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.LongSupplier; import java.util.regex.Pattern; @SideEffectFree @@ -165,7 +166,7 @@ public class ControlRate extends AbstractProcessor { private final ConcurrentMap dataThrottleMap = new ConcurrentHashMap<>(); private final ConcurrentMap countThrottleMap = new ConcurrentHashMap<>(); - private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong lastThrottleClearTime = new AtomicLong(getCurrentTimeMillis()); private volatile String rateControlCriteria = null; private volatile String rateControlAttribute = null; private volatile String maximumRateStr = null; @@ -299,7 +300,7 @@ public class ControlRate extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH)); + List flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis)); if (flowFiles.isEmpty()) { context.yield(); return; @@ -307,9 +308,9 @@ public class ControlRate extends AbstractProcessor { // Periodically clear any Throttle that has not been used in more than 2 throttling periods final long lastClearTime = lastThrottleClearTime.get(); - final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); + final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); if (lastClearTime < throttleExpirationMillis) { - if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { + if (lastThrottleClearTime.compareAndSet(lastClearTime, getCurrentTimeMillis())) { final Set> throttleSet = new HashSet<>(); if (dataThrottleRequired()) { throttleSet.addAll(dataThrottleMap.entrySet()); @@ -337,16 +338,25 @@ public class ControlRate extends AbstractProcessor { final ComponentLog logger = getLogger(); for (FlowFile flowFile : flowFiles) { // call this to capture potential error - if (!isAccrualPossible(flowFile)) { - logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); - session.transfer(flowFile, REL_FAILURE); - } else { + if (isAccrualPossible(flowFile)) { logger.info("transferring {} to 'success'", flowFile); session.transfer(flowFile, REL_SUCCESS); + } else { + logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); + session.transfer(flowFile, REL_FAILURE); } } } + /** + * Get current time in milliseconds + * + * @return Current time in milliseconds from System + */ + protected long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + /* * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on * flowfile attribute, the specified attribute must be present and must be a long integer. @@ -404,15 +414,17 @@ public class ControlRate extends AbstractProcessor { private final long timePeriodMillis; private final TimedBuffer timedBuffer; private final ComponentLog logger; + private final LongSupplier currentTimeSupplier; private volatile long penalizationPeriod = 0; private volatile long penalizationExpired = 0; private volatile long lastUpdateTime; - public Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger) { + private Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger, final LongSupplier currentTimeSupplier) { this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit); - this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess()); + this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess(), currentTimeSupplier); this.logger = logger; + this.currentTimeSupplier = currentTimeSupplier; } public void setMaxRate(final long maxRate) { @@ -428,7 +440,7 @@ public class ControlRate extends AbstractProcessor { if (value < 0) { return false; } - final long now = System.currentTimeMillis(); + final long now = currentTimeSupplier.getAsLong(); if (penalizationExpired > now) { return false; } @@ -478,10 +490,12 @@ public class ControlRate extends AbstractProcessor { private class ThrottleFilter implements FlowFileFilter { private final int flowFilesPerBatch; + private final LongSupplier currentTimeSupplier; private int flowFilesInBatch = 0; - ThrottleFilter(final int maxFFPerBatch) { - flowFilesPerBatch = maxFFPerBatch; + ThrottleFilter(final int maxFFPerBatch, final LongSupplier currentTimeSupplier) { + this.flowFilesPerBatch = maxFFPerBatch; + this.currentTimeSupplier = currentTimeSupplier; } @Override @@ -505,7 +519,7 @@ public class ControlRate extends AbstractProcessor { boolean dataThrottlingActive = false; if (dataThrottleRequired()) { if (dataThrottle == null) { - dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); + dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier); dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue()); dataThrottleMap.put(groupName, dataThrottle); } @@ -534,7 +548,7 @@ public class ControlRate extends AbstractProcessor { // continue processing count throttle only if required and if data throttle is not already limiting flowfiles if (countThrottleRequired() && !dataThrottlingActive) { if (countThrottle == null) { - countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); + countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier); countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr)); countThrottleMap.put(groupName, countThrottle); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 0b68022622..eb33a7c9df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -27,18 +27,30 @@ import java.util.Map; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class TestControlRate { - private static final long ONE_SEC_PLUS = 1010L; + private static final String ONE_SECOND_TIME_PERIOD = "1 s"; + + private static final long CURRENT_TIME_INCREMENT = 1100; + + private ConfigurableControlRate controlRate; + + private TestRunner runner; + + @BeforeEach + public void setRunner() { + controlRate = new ConfigurableControlRate(); + runner = TestRunners.newTestRunner(controlRate); + } @Test public void testLimitExceededThenOtherLimitNotExceeded() { // If we have flowfiles queued that have different values for the "Rate Controlled Attribute" // and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles // whose rate does not need to be throttled. - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "3"); runner.setProperty(ControlRate.TIME_PERIOD, "1 min"); @@ -64,11 +76,10 @@ public class TestControlRate { } @Test - public void testFileCountRate() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testFileCountRate() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "3"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.enqueue("test data 1"); runner.enqueue("test data 2"); @@ -86,18 +97,18 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 3 files and after 1 second, we should be able to send the 4th - Thread.sleep(ONE_SEC_PLUS); - runner.run(); + incrementCurrentTime(); + + runner.run(5); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); runner.assertQueueEmpty(); } @Test - public void testFileCountWithGrouping() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testFileCountWithGrouping() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "2"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); createFlowFileWithGroup(runner, "one"); @@ -118,18 +129,17 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group - Thread.sleep(ONE_SEC_PLUS); + incrementCurrentTime(); runner.run(2); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2); runner.assertQueueEmpty(); } @Test - public void testDataSizeRate() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testDataSizeRate() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE); runner.setProperty(ControlRate.MAX_RATE, "20 b"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.enqueue("testdata 1"); runner.enqueue("testdata 2"); @@ -147,19 +157,18 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 20 bytes and after 1 second, we should be able to send 20 more - Thread.sleep(ONE_SEC_PLUS); + incrementCurrentTime(); runner.run(2, false); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2); runner.assertQueueEmpty(); } @Test - public void testViaAttribute() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testViaAttribute() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE); runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count"); runner.setProperty(ControlRate.MAX_RATE, "20000"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); createFlowFile(runner, 1000); createFlowFile(runner, 3000); @@ -178,14 +187,14 @@ public class TestControlRate { runner.assertTransferCount(ControlRate.REL_SUCCESS, 0); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueNotEmpty(); - Thread.sleep(1200L); + incrementCurrentTime(1450); // at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed runner.run(50, false); runner.assertTransferCount(ControlRate.REL_SUCCESS, 0); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueNotEmpty(); - Thread.sleep(600L); + incrementCurrentTime(600); // at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000 runner.run(); @@ -195,12 +204,11 @@ public class TestControlRate { } @Test - public void testAttributeDoesNotExist() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testAttributeDoesNotExist() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE); runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute"); runner.setProperty(ControlRate.MAX_RATE, "20000"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); createFlowFile(runner, 1000); createFlowFile(runner, 3000); @@ -218,11 +226,10 @@ public class TestControlRate { @Test public void testBadAttributeRate() { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE); runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count"); runner.setProperty(ControlRate.MAX_RATE, "20000"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); final Map attributeMap = new HashMap<>(); attributeMap.put("count", "bad string"); @@ -236,10 +243,9 @@ public class TestControlRate { @Test public void testBatchLimit() { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "5555"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); final int TEST_FILE_COUNT = 1500; @@ -265,10 +271,9 @@ public class TestControlRate { @Test public void testNonExistingGroupAttribute() { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "2"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); createFlowFileWithGroup(runner, "one"); @@ -283,11 +288,10 @@ public class TestControlRate { } @Test - public void testIncreaseDataRate() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testIncreaseDataRate() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE); runner.setProperty(ControlRate.MAX_RATE, "11 B"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.enqueue("test data 1"); runner.enqueue("test data 2"); @@ -310,7 +314,7 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // after 1 second, we should be able to send the up to 3 more flowfiles - Thread.sleep(ONE_SEC_PLUS); + incrementCurrentTime(); runner.run(7, false); runner.assertTransferCount(ControlRate.REL_SUCCESS, 6); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); @@ -318,11 +322,10 @@ public class TestControlRate { } @Test - public void testIncreaseFlowFileRate() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testIncreaseFlowFileRate() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "1"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); runner.enqueue("test data 1"); runner.enqueue("test data 2"); @@ -345,7 +348,7 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // after 1 second, we should be able to send the up to 3 more flowfiles - Thread.sleep(ONE_SEC_PLUS); + incrementCurrentTime(); runner.run(7, false); runner.assertTransferCount(ControlRate.REL_SUCCESS, 6); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); @@ -353,10 +356,9 @@ public class TestControlRate { } @Test - public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testDataOrFlowFileCountLimitedByBytes() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); // Data rate will throttle before FlowFile count runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B"); runner.setProperty(ControlRate.MAX_COUNT_RATE, "3"); @@ -377,17 +379,16 @@ public class TestControlRate { runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueNotEmpty(); // we have sent 22 bytes and after 1 second, we should be able to send 22 more - Thread.sleep(ONE_SEC_PLUS); + incrementCurrentTime(1500); runner.run(4, false); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); runner.assertQueueEmpty(); } @Test - public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testDataOrFlowFileCountLimitedByCount() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); // FlowFile count rate will throttle before data rate runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second @@ -396,32 +397,23 @@ public class TestControlRate { runner.enqueue("test data 2"); runner.enqueue("test data 3"); - runner.run(4, false); + runner.run(1, false); runner.assertTransferCount(ControlRate.REL_SUCCESS, 1); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueNotEmpty(); - // we have sent 1 flowfile and after 1 second, we should be able to send 1 more - Thread.sleep(ONE_SEC_PLUS); - runner.run(4, false); - runner.assertTransferCount(ControlRate.REL_SUCCESS, 2); - runner.assertTransferCount(ControlRate.REL_FAILURE, 0); - runner.assertQueueNotEmpty(); - - // we have sent 2 flowfile and after 1 second, we should be able to send 1 more - Thread.sleep(ONE_SEC_PLUS); - runner.run(4, false); - runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3); + incrementCurrentTime(2000); + runner.run(1, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 3); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueEmpty(); } @Test - public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + public void testDataOrFlowFileCountLimitedByBytesThenCount() { runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD); // Data rate will throttle before FlowFile count runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B"); runner.setProperty(ControlRate.MAX_COUNT_RATE, "5"); @@ -440,27 +432,17 @@ public class TestControlRate { runner.assertTransferCount(ControlRate.REL_SUCCESS, 2); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueNotEmpty(); - runner.clearTransferState(); // we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count - Thread.sleep(ONE_SEC_PLUS); - runner.run(10, false); - runner.assertTransferCount(ControlRate.REL_SUCCESS, 5); - runner.assertTransferCount(ControlRate.REL_FAILURE, 0); - runner.assertQueueNotEmpty(); - runner.clearTransferState(); - - // after 1 second, we should be able to send the remaining flowfile - Thread.sleep(ONE_SEC_PLUS); - runner.run(10, false); - runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); + incrementCurrentTime(1500); + runner.run(1, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 8); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueEmpty(); } @Test public void testValidate() { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE); runner.assertNotValid(); // MAX_RATE is not set runner.setProperty(ControlRate.MAX_RATE, "1"); @@ -527,4 +509,22 @@ public class TestControlRate { attributeMap.put("group", group); runner.enqueue(new byte[0], attributeMap); } + + private void incrementCurrentTime() { + controlRate.currentTimeMillis += CURRENT_TIME_INCREMENT; + } + + private void incrementCurrentTime(final long milliseconds) { + controlRate.currentTimeMillis += milliseconds; + } + + private static class ConfigurableControlRate extends ControlRate { + + private long currentTimeMillis = System.currentTimeMillis(); + + @Override + protected long getCurrentTimeMillis() { + return currentTimeMillis; + } + } }