diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 4634f041e1..34b9a8144b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -42,7 +27,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -59,6 +43,21 @@ import org.apache.nifi.util.timebuffer.LongEntityAccess; import org.apache.nifi.util.timebuffer.TimedBuffer; import org.apache.nifi.util.timebuffer.TimestampedLong; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + @SideEffectFree @TriggerSerially @InputRequirement(Requirement.INPUT_REQUIRED) @@ -71,30 +70,58 @@ public class ControlRate extends AbstractProcessor { public static final String DATA_RATE = "data rate"; public static final String FLOWFILE_RATE = "flowfile count"; public static final String ATTRIBUTE_RATE = "attribute value"; + public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count"; + public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE, "Rate is controlled by counting bytes transferred per time duration."); public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE, - "Rate is controlled by counting flowfiles transferred per time duration"); + "Rate is controlled by counting FlowFiles transferred per time duration"); public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE, "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration"); + public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE, + "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced"); // based on testing to balance commits and 10,000 FF swap limit public static final int MAX_FLOW_FILES_PER_BATCH = 1000; + private static final long DEFAULT_ACCRUAL_COUNT = -1L; public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() .name("Rate Control Criteria") + .displayName("Rate Control Criteria") .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") .required(true) - .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE) + .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE) .defaultValue(DATA_RATE) .build(); public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() .name("Maximum Rate") + .displayName("Maximum Rate") .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") - .required(true) + .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria + .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE) .build(); + public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder() + .name("Maximum Data Rate") + .displayName("Maximum Data Rate") + .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " + + "Data Size (such as '1 MB') representing bytes per Time Duration.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE) + .build(); + + public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder() + .name("Maximum FlowFile Rate") + .displayName("Maximum FlowFile Rate") + .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a " + + "positive integer representing FlowFiles count per Time Duration") + .required(false) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE) + .build(); + public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Rate Controlled Attribute") .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " @@ -103,6 +130,7 @@ public class ControlRate extends AbstractProcessor { .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(RATE_CONTROL_CRITERIA, ATTRIBUTE_RATE) .build(); public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() .name("Time Duration") @@ -135,11 +163,13 @@ public class ControlRate extends AbstractProcessor { private List properties; private Set relationships; - private final ConcurrentMap throttleMap = new ConcurrentHashMap<>(); + private final ConcurrentMap dataThrottleMap = new ConcurrentHashMap<>(); + private final ConcurrentMap countThrottleMap = new ConcurrentHashMap<>(); private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); private volatile String rateControlCriteria = null; private volatile String rateControlAttribute = null; private volatile String maximumRateStr = null; + private volatile String maximumCountRateStr = null; private volatile String groupingAttributeName = null; private volatile int timePeriodSeconds = 1; @@ -147,9 +177,11 @@ public class ControlRate extends AbstractProcessor { protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); properties.add(RATE_CONTROL_CRITERIA); - properties.add(MAX_RATE); - properties.add(RATE_CONTROL_ATTRIBUTE_NAME); properties.add(TIME_PERIOD); + properties.add(MAX_RATE); + properties.add(MAX_DATA_RATE); + properties.add(MAX_COUNT_RATE); + properties.add(RATE_CONTROL_ATTRIBUTE_NAME); properties.add(GROUPING_ATTRIBUTE_NAME); this.properties = Collections.unmodifiableList(properties); @@ -173,30 +205,30 @@ public class ControlRate extends AbstractProcessor { protected Collection customValidate(final ValidationContext context) { final List validationResults = new ArrayList<>(super.customValidate(context)); - final Validator rateValidator; switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { - case DATA_RATE: - rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; + case DATA_OR_FLOWFILE_RATE: + // enforce validators to be sure properties are configured; they are only required for DATA_OR_FLOWFILE_RATE criteria + validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate(MAX_DATA_RATE.getDisplayName(), context.getProperty(MAX_DATA_RATE).getValue(), context)); + validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate(MAX_COUNT_RATE.getDisplayName(), context.getProperty(MAX_COUNT_RATE).getValue(), context)); break; + case DATA_RATE: + validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context)); + break; + case ATTRIBUTE_RATE: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); if (rateAttr == null) { validationResults.add(new ValidationResult.Builder() .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()) - .explanation(" property must be set if using of 'attribute value'") + .explanation("property must be set if using of 'attribute value'") .build()); } - break; case FLOWFILE_RATE: - default: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context)); + break; + default: + // no custom validation required break; - } - - final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context); - if (!rateResult.isValid()) { - validationResults.add(rateResult); } return validationResults; @@ -211,16 +243,37 @@ public class ControlRate extends AbstractProcessor { || descriptor.equals(GROUPING_ATTRIBUTE_NAME) || descriptor.equals(TIME_PERIOD)) { // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. - throttleMap.clear(); - } else if (descriptor.equals(MAX_RATE)) { + dataThrottleMap.clear(); + countThrottleMap.clear(); + } else if (descriptor.equals(MAX_RATE) || descriptor.equals(MAX_DATA_RATE)) { + // MAX_RATE could affect either throttle map; MAX_DATA_RATE only affects data throttle map final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) { - newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(newValue); + if (newValue != null) { + if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) { + newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(newValue); + } + if (dataThrottleRequired()) { + for (final Throttle throttle : dataThrottleMap.values()) { + throttle.setMaxRate(newRate); + } + } + if (countThrottleRequired()) { + for (final Throttle throttle : countThrottleMap.values()) { + throttle.setMaxRate(newRate); + } + } } - - for (final Throttle throttle : throttleMap.values()) { + } else if (descriptor.equals(MAX_COUNT_RATE)) { + // MAX_COUNT_RATE only affects count throttle map + long newRate; + try { + newRate = Long.parseLong(newValue); + } catch (NumberFormatException nfe) { + newRate = -1; + } + for (final Throttle throttle : countThrottleMap.values()) { throttle.setMaxRate(newRate); } } @@ -230,7 +283,16 @@ public class ControlRate extends AbstractProcessor { public void onScheduled(final ProcessContext context) { rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase(); rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); - maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase(); + if (dataThrottleRequired()) { + // Use MAX_DATA_RATE only for DATA_OR_FLOWFILE_RATE criteria + maximumRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE) + ? context.getProperty(MAX_DATA_RATE).getValue().toUpperCase() : context.getProperty(MAX_RATE).getValue().toUpperCase(); + } + if (countThrottleRequired()) { + // Use MAX_COUNT_RATE only for DATA_OR_FLOWFILE_RATE criteria + maximumCountRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE) + ? context.getProperty(MAX_COUNT_RATE).getValue() : context.getProperty(MAX_RATE).getValue(); + } groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue(); } @@ -248,7 +310,14 @@ public class ControlRate extends AbstractProcessor { final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); if (lastClearTime < throttleExpirationMillis) { if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { - final Iterator> itr = throttleMap.entrySet().iterator(); + final Set> throttleSet = new HashSet<>(); + if (dataThrottleRequired()) { + throttleSet.addAll(dataThrottleMap.entrySet()); + } + if (countThrottleRequired()) { + throttleSet.addAll(countThrottleMap.entrySet()); + } + final Iterator> itr = throttleSet.iterator(); while (itr.hasNext()) { final Map.Entry entry = itr.next(); final Throttle throttle = entry.getValue(); @@ -268,48 +337,67 @@ public class ControlRate extends AbstractProcessor { final ComponentLog logger = getLogger(); for (FlowFile flowFile : flowFiles) { // call this to capture potential error - final long accrualAmount = getFlowFileAccrual(flowFile); - if (accrualAmount < 0) { - logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile}); + if (!isAccrualPossible(flowFile)) { + logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); session.transfer(flowFile, REL_FAILURE); } else { - logger.info("transferring {} to 'success'", new Object[]{flowFile}); + logger.info("transferring {} to 'success'", flowFile); session.transfer(flowFile, REL_SUCCESS); } } } /* - * Determine the amount this FlowFile will incur against the maximum allowed rate. - * If the value returned is negative then the flowfile given is missing the required attribute - * or the attribute has an invalid value for accrual. + * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on + * flowfile attribute, the specified attribute must be present and must be a long integer. */ - private long getFlowFileAccrual(FlowFile flowFile) { - long rateValue; - switch (rateControlCriteria) { - case DATA_RATE: - rateValue = flowFile.getSize(); - break; - case FLOWFILE_RATE: - rateValue = 1; - break; - case ATTRIBUTE_RATE: - final String attributeValue = flowFile.getAttribute(rateControlAttribute); - if (attributeValue == null) { - return -1L; - } + private boolean isAccrualPossible(FlowFile flowFile) { + if (rateControlCriteria.equals(ATTRIBUTE_RATE)) { + final String attributeValue = flowFile.getAttribute(rateControlAttribute); + return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches(); + } + return true; + } - if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { - return -1L; - } - rateValue = Long.parseLong(attributeValue); - break; - default: - throw new AssertionError(" property set to illegal value of " + rateControlCriteria); + /* + * Determine the amount this FlowFile will incur against the maximum allowed rate. + * This is applicable to data size accrual only + */ + private long getDataSizeAccrual(FlowFile flowFile) { + return flowFile.getSize(); + } + + /* + * Determine the amount this FlowFile will incur against the maximum allowed rate. + * This is applicable to counting accruals, flowfiles or attributes + */ + private long getCountAccrual(FlowFile flowFile) { + long rateValue = DEFAULT_ACCRUAL_COUNT; + if (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)) { + rateValue = 1; + } + if (rateControlCriteria.equals(ATTRIBUTE_RATE)) { + final String attributeValue = flowFile.getAttribute(rateControlAttribute); + if (attributeValue == null) { + return DEFAULT_ACCRUAL_COUNT; + } + + if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { + return DEFAULT_ACCRUAL_COUNT; + } + rateValue = Long.parseLong(attributeValue); } return rateValue; } + private boolean dataThrottleRequired() { + return rateControlCriteria != null && (rateControlCriteria.equals(DATA_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)); + } + + private boolean countThrottleRequired() { + return rateControlCriteria != null && (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(ATTRIBUTE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)); + } + private static class Throttle extends ReentrantLock { private final AtomicLong maxRate = new AtomicLong(1L); @@ -336,6 +424,10 @@ public class ControlRate extends AbstractProcessor { } public boolean tryAdd(final long value) { + // value should never be negative, but if it is return immediately + if (value < 0) { + return false; + } final long now = System.currentTimeMillis(); if (penalizationExpired > now) { return false; @@ -346,7 +438,7 @@ public class ControlRate extends AbstractProcessor { final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis); if (sum != null && sum.getValue() >= maxRateValue) { if (logger.isDebugEnabled()) { - logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value}); + logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", sum.getValue(), sum.getTimestamp(), value); } return false; } @@ -354,7 +446,7 @@ public class ControlRate extends AbstractProcessor { // Implement the Throttle penalization based on how much extra 'amountOver' was allowed through if (penalizationPeriod > 0) { if (logger.isDebugEnabled()) { - logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{penalizationPeriod}); + logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", penalizationPeriod); } penalizationExpired = now + penalizationPeriod; penalizationPeriod = 0; @@ -363,7 +455,7 @@ public class ControlRate extends AbstractProcessor { if (logger.isDebugEnabled()) { logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through", - new Object[]{sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value}); + sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value); } final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); @@ -374,7 +466,7 @@ public class ControlRate extends AbstractProcessor { this.penalizationPeriod = (long) (timePeriodMillis * pct); if (logger.isDebugEnabled()) { - logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod}); + logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", value, penalizationPeriod); } } @@ -394,8 +486,7 @@ public class ControlRate extends AbstractProcessor { @Override public FlowFileFilterResult filter(FlowFile flowFile) { - long accrual = getFlowFileAccrual(flowFile); - if (accrual < 0) { + if (!isAccrualPossible(flowFile)) { // this FlowFile is invalid for this configuration so let the processor deal with it return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } @@ -408,34 +499,59 @@ public class ControlRate extends AbstractProcessor { groupName = DEFAULT_GROUP_ATTRIBUTE; } - Throttle throttle = throttleMap.get(groupName); - if (throttle == null) { - throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); + Throttle dataThrottle = dataThrottleMap.get(groupName); + Throttle countThrottle = countThrottleMap.get(groupName); - final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) { - newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(maximumRateStr); + boolean dataThrottlingActive = false; + if (dataThrottleRequired()) { + if (dataThrottle == null) { + dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); + dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue()); + dataThrottleMap.put(groupName, dataThrottle); } - throttle.setMaxRate(newRate); - throttleMap.put(groupName, throttle); + dataThrottle.lock(); + try { + if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) { + flowFilesInBatch++; + if (flowFilesInBatch>= flowFilesPerBatch) { + flowFilesInBatch = 0; + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + // only accept flowfile if additional count throttle does not need to run + if (!countThrottleRequired()) { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + } + } else { + dataThrottlingActive = true; + } + } finally { + dataThrottle.unlock(); + } } - throttle.lock(); - try { - if (throttle.tryAdd(accrual)) { - flowFilesInBatch += 1; - if (flowFilesInBatch>= flowFilesPerBatch) { - flowFilesInBatch = 0; - return FlowFileFilterResult.ACCEPT_AND_TERMINATE; - } else { - return FlowFileFilterResult.ACCEPT_AND_CONTINUE; - } + // continue processing count throttle only if required and if data throttle is not already limiting flowfiles + if (countThrottleRequired() && !dataThrottlingActive) { + if (countThrottle == null) { + countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); + countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr)); + countThrottleMap.put(groupName, countThrottle); + } + countThrottle.lock(); + try { + if (countThrottle.tryAdd(getCountAccrual(flowFile))) { + flowFilesInBatch++; + if (flowFilesInBatch>= flowFilesPerBatch) { + flowFilesInBatch = 0; + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + } + } finally { + countThrottle.unlock(); } - } finally { - throttle.unlock(); } // If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html new file mode 100644 index 0000000000..7c8de2c7f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html @@ -0,0 +1,64 @@ + + + + + + ControlRate + + + + +

This processor throttles throughput of FlowFiles based on a configured rate. The rate can be specified as either a direct data rate (bytes per time period), or by + counting FlowFiles or a specific attribute value. In all cases, the time period for measurement is specified in the Time Duration property. +

+

The processor operates in one of four available modes. The mode is determined by the Rate Control Criteria property. +

+

+ + + + + + + + + + + + + + + + + + + + + +
ModeDescription
Data RateThe FlowFile content size is accumulated for all FlowFiles passing through this processor. FlowFiles are throttled to ensure a maximum overall data rate (bytes per time period) + is not exceeded. The Maximum Rate property specifies the maximum bytes allowed per Time Duration.
FlowFile CountFlowFiles are counted regardless of content size. No more than the specified number of FlowFiles pass through this processor in the given Time Duration. The Maximum Rate property + specifies the maximum number of FlowFiles allowed per Time Duration.
Attribute ValueThe value of an attribute is accumulated to determine overall rate. The Rate Controlled Attribute property specifies the attribute whose value will be accumulated. The value of + the specified attribute is expected to be an integer. This mode is independent of overall FlowFile size and count.
Data Rate or FlowFile CountThis mode provides a combination of Data Rate and FlowFile Count. Both rates are accumulated and FlowFiles are throttled if either rate is exceeded. Both Maximum Data Rate and + Maximum FlowFile Rate properties must be specified to determine content size and FlowFile count per Time Duration.
+

+

If the Grouping Attribute property is specified, all rates are accumulated separately for unique values of the specified attribute. For example, assume Grouping Attribute property is + specified and the its value is "city". All FlowFiles containing a "city" attribute with value "Albuquerque" will have an accumulated rate calculated. A separate rate will be calculated + for all FlowFiles containing a "city" attribute with a value "Boston". In other words, separate rate calculations will be accumulated for all unique values of the Grouping Attribute. +

+ + + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index fbecd767c4..0b68022622 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -31,6 +31,8 @@ import org.junit.jupiter.api.Test; public class TestControlRate { + private static final long ONE_SEC_PLUS = 1010L; + @Test public void testLimitExceededThenOtherLimitNotExceeded() { // If we have flowfiles queued that have different values for the "Rate Controlled Attribute" @@ -84,7 +86,7 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 3 files and after 1 second, we should be able to send the 4th - Thread.sleep(1100L); + Thread.sleep(ONE_SEC_PLUS); runner.run(); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); runner.assertQueueEmpty(); @@ -116,7 +118,7 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group - Thread.sleep(1100L); + Thread.sleep(ONE_SEC_PLUS); runner.run(2); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2); runner.assertQueueEmpty(); @@ -145,7 +147,7 @@ public class TestControlRate { runner.assertQueueNotEmpty(); // we have sent 20 bytes and after 1 second, we should be able to send 20 more - Thread.sleep(1100L); + Thread.sleep(ONE_SEC_PLUS); runner.run(2, false); runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2); runner.assertQueueEmpty(); @@ -192,6 +194,28 @@ public class TestControlRate { runner.assertQueueEmpty(); } + @Test + public void testAttributeDoesNotExist() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE); + runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute"); + runner.setProperty(ControlRate.MAX_RATE, "20000"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + + createFlowFile(runner, 1000); + createFlowFile(runner, 3000); + createFlowFile(runner, 5000); + createFlowFile(runner, 20000); + createFlowFile(runner, 1000); + + runner.run(5, false); + + // all flowfiles transfer to failure since throttling attribute is not present + runner.assertAllFlowFilesTransferred(ControlRate.REL_FAILURE, 5); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 0); + runner.assertQueueEmpty(); + } + @Test public void testBadAttributeRate() { final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); @@ -211,7 +235,7 @@ public class TestControlRate { } @Test - public void testBatchLimit() throws InterruptedException { + public void testBatchLimit() { final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "5555"); @@ -240,7 +264,7 @@ public class TestControlRate { } @Test - public void testNonExistingGroupAttribute() throws InterruptedException { + public void testNonExistingGroupAttribute() { final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "2"); @@ -258,10 +282,245 @@ public class TestControlRate { runner.assertQueueEmpty(); } + @Test + public void testIncreaseDataRate() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE); + runner.setProperty(ControlRate.MAX_RATE, "11 B"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + runner.enqueue("test data 4"); + runner.enqueue("test data 5"); + runner.enqueue("test data 6"); + + runner.run(7, true); + + runner.assertTransferCount(ControlRate.REL_SUCCESS, 1); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // Increase rate after stopping processor. Previous count should remain since we are still inside time period + runner.setProperty(ControlRate.MAX_RATE, "33 B"); + runner.run(7, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 3); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // after 1 second, we should be able to send the up to 3 more flowfiles + Thread.sleep(ONE_SEC_PLUS); + runner.run(7, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 6); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + + @Test + public void testIncreaseFlowFileRate() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "1"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + runner.enqueue("test data 4"); + runner.enqueue("test data 5"); + runner.enqueue("test data 6"); + + runner.run(7, true); + + runner.assertTransferCount(ControlRate.REL_SUCCESS, 1); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // Increase rate after stopping processor. Previous count should remain since we are still inside time period + runner.setProperty(ControlRate.MAX_RATE, "3"); + runner.run(7, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 3); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // after 1 second, we should be able to send the up to 3 more flowfiles + Thread.sleep(ONE_SEC_PLUS); + runner.run(7, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 6); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + + @Test + public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + // Data rate will throttle before FlowFile count + runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B"); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "3"); + + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + + runner.run(4, false); + + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + runner.clearTransferState(); + + runner.run(4, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 0); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + // we have sent 22 bytes and after 1 second, we should be able to send 22 more + Thread.sleep(ONE_SEC_PLUS); + runner.run(4, false); + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); + runner.assertQueueEmpty(); + } + + @Test + public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + // FlowFile count rate will throttle before data rate + runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued + runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second + + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + + runner.run(4, false); + + runner.assertTransferCount(ControlRate.REL_SUCCESS, 1); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // we have sent 1 flowfile and after 1 second, we should be able to send 1 more + Thread.sleep(ONE_SEC_PLUS); + runner.run(4, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 2); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + + // we have sent 2 flowfile and after 1 second, we should be able to send 1 more + Thread.sleep(ONE_SEC_PLUS); + runner.run(4, false); + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + + @Test + public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + // Data rate will throttle before FlowFile count + runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B"); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "5"); + + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + runner.enqueue("4"); + runner.enqueue("5"); + runner.enqueue("6"); + runner.enqueue("7"); + runner.enqueue("8"); + + runner.run(10, false); + + runner.assertTransferCount(ControlRate.REL_SUCCESS, 2); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + runner.clearTransferState(); + + // we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count + Thread.sleep(ONE_SEC_PLUS); + runner.run(10, false); + runner.assertTransferCount(ControlRate.REL_SUCCESS, 5); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + runner.clearTransferState(); + + // after 1 second, we should be able to send the remaining flowfile + Thread.sleep(ONE_SEC_PLUS); + runner.run(10, false); + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + + @Test + public void testValidate() { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE); + runner.assertNotValid(); // MAX_RATE is not set + runner.setProperty(ControlRate.MAX_RATE, "1"); + runner.assertNotValid(); // MAX_RATE is not a byte size + runner.setProperty(ControlRate.MAX_RATE, "1 MB"); + runner.assertValid(); + runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB"); + runner.assertValid(); // MAX_DATA_RATE is ignored + runner.removeProperty(ControlRate.MAX_RATE); + runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria + + runner.clearProperties(); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.assertNotValid(); // MAX_RATE is not set + runner.setProperty(ControlRate.MAX_RATE, "1 MB"); + runner.assertNotValid(); // MAX_RATE is not an integer + runner.setProperty(ControlRate.MAX_RATE, "1"); + runner.assertValid(); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); + runner.assertValid(); // MAX_COUNT_RATE is ignored + runner.removeProperty(ControlRate.MAX_RATE); + runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria + + runner.clearProperties(); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE); + runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count"); + runner.assertNotValid(); // MAX_RATE is not set + runner.setProperty(ControlRate.MAX_RATE, "1 MB"); + runner.assertNotValid(); // MAX_RATE is not an integer + runner.setProperty(ControlRate.MAX_RATE, "1"); + runner.assertValid(); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); + runner.assertValid(); // MAX_COUNT_RATE is ignored + runner.removeProperty(ControlRate.MAX_RATE); + runner.assertNotValid();// MAX_RATE is a required property for this rate control criteria + runner.setProperty(ControlRate.MAX_RATE, "1"); + runner.removeProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME); + runner.assertNotValid();// RATE_CONTROL_ATTRIBUTE_NAME is a required property for this rate control criteria + + runner.clearProperties(); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB"); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); + runner.setProperty(ControlRate.MAX_COUNT_RATE, "2"); + runner.assertValid(); // both MAX_DATA_RATE and MAX_COUNT_RATE are set + runner.removeProperty(ControlRate.MAX_COUNT_RATE); + runner.assertNotValid(); // MAX_COUNT_RATE is not set + runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); + runner.removeProperty(ControlRate.MAX_DATA_RATE); + runner.assertNotValid();// MAX_DATA_RATE is not set + runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB"); + runner.setProperty(ControlRate.MAX_RATE, "1 MB"); + runner.assertValid(); // MAX_RATE is ignored + } + private void createFlowFile(final TestRunner runner, final int value) { final Map attributeMap = new HashMap<>(); attributeMap.put("count", String.valueOf(value)); - runner.enqueue(new byte[0], attributeMap); + byte[] data = "0123456789".getBytes(); + runner.enqueue(data, attributeMap); } private void createFlowFileWithGroup(final TestRunner runner, final String group) { final Map attributeMap = new HashMap<>();