diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index ef62eee94f..4bbcc23f3f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -33,9 +34,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -45,7 +46,6 @@ import org.apache.nifi.util.timebuffer.TimestampedLong; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -66,6 +66,52 @@ import java.util.regex.Pattern; @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors." + " If you configure a very small Time Duration, then the accuracy of the throttle gets worse." + " You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.") +@UseCase(description = "Limit the rate at which data is sent to a downstream system with little to no bursts", + keywords = {"throttle", "limit", "slow down", "data rate"}, + configuration = """ + Set the "Rate Control Criteria" to `data rate`. + Set the "Time Duration" property to `1 sec`. + Configure the "Maximum Rate" property to specify how much data should be allowed through each second. + + For example, to allow through 8 MB per second, set "Maximum Rate" to `8 MB`. + """ +) +@UseCase(description = "Limit the rate at which FlowFiles are sent to a downstream system with little to no bursts", + keywords = {"throttle", "limit", "slow down", "flowfile rate"}, + configuration = """ + Set the "Rate Control Criteria" to `flowfile count`. + Set the "Time Duration" property to `1 sec`. + Configure the "Maximum Rate" property to specify how many FlowFiles should be allowed through each second. + + For example, to allow through 100 FlowFiles per second, set "Maximum Rate" to `100`. + """ +) +@UseCase(description = "Reject requests that exceed a specific rate with little to no bursts", + keywords = {"throttle", "limit", "slow down", "request rate"}, + configuration = """ + Set the "Rate Control Criteria" to `flowfile count`. + Set the "Time Duration" property to `1 sec`. + Set the "Rate Exceeded Strategy" property to `Route to 'rate exceeded'`. + Configure the "Maximum Rate" property to specify how many requests should be allowed through each second. + + For example, to allow through 100 requests per second, set "Maximum Rate" to `100`. + If more than 100 requests come in during any one second, the additional requests will be routed to `rate exceeded` instead of `success`. + """ +) +@UseCase(description = "Reject requests that exceed a specific rate, allowing for bursts", + keywords = {"throttle", "limit", "slow down", "request rate"}, + configuration = """ + Set the "Rate Control Criteria" to `flowfile count`. + Set the "Time Duration" property to `1 min`. + Set the "Rate Exceeded Strategy" property to `Route to 'rate exceeded'`. + Configure the "Maximum Rate" property to specify how many requests should be allowed through each minute. + + For example, to allow through 100 requests per second, set "Maximum Rate" to `6000`. + This will allow through 6,000 FlowFiles per minute, which averages to 100 FlowFiles per second. However, those 6,000 FlowFiles may come all within the first couple of + seconds, or they may come in over a period of 60 seconds. As a result, this gives us an average rate of 100 FlowFiles per second but allows for bursts of data. + If more than 6,000 requests come in during any one minute, the additional requests will be routed to `rate exceeded` instead of `success`. + """ +) public class ControlRate extends AbstractProcessor { public static final String DATA_RATE = "data rate"; @@ -82,6 +128,11 @@ public class ControlRate extends AbstractProcessor { public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE, "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced"); + static final AllowableValue HOLD_FLOWFILE = new AllowableValue("Hold FlowFile", "Hold FlowFile", + "The FlowFile will be held in its input queue until the rate of data has fallen below the configured maximum and will then be allowed through."); + static final AllowableValue ROUTE_TO_RATE_EXCEEDED = new AllowableValue("Route to 'rate exceeded'", "Route to 'rate exceeded'", + "The FlowFile will be routed to the 'rate exceeded' Relationship."); + // based on testing to balance commits and 10,000 FF swap limit public static final int MAX_FLOW_FILES_PER_BATCH = 1000; private static final long DEFAULT_ACCRUAL_COUNT = -1L; @@ -123,6 +174,14 @@ public class ControlRate extends AbstractProcessor { .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE) .build(); + public static final PropertyDescriptor RATE_EXCEEDED_STRATEGY = new PropertyDescriptor.Builder() + .name("Rate Exceeded Strategy") + .description("Specifies how to handle an incoming FlowFile when the maximum data rate has been exceeded.") + .required(true) + .allowableValues(HOLD_FLOWFILE, ROUTE_TO_RATE_EXCEEDED) + .defaultValue(HOLD_FLOWFILE.getValue()) + .build(); + public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Rate Controlled Attribute") .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " @@ -149,20 +208,37 @@ public class ControlRate extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("FlowFiles are transferred to this relationship under normal conditions") .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() + static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format") .build(); + static final Relationship REL_RATE_EXCEEDED = new Relationship.Builder() + .name("rate exceeded") + .description("A FlowFile will be routed to this Relationship if it results in exceeding the maximum threshold allowed based on the Processor's configuration and if the Rate Exceeded " + + "Strategy is configured to use this Relationship.") + .build(); private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; - private List properties; - private Set relationships; + private static final List properties = List.of( + RATE_CONTROL_CRITERIA, + TIME_PERIOD, + MAX_RATE, + MAX_DATA_RATE, + MAX_COUNT_RATE, + RATE_EXCEEDED_STRATEGY, + RATE_CONTROL_ATTRIBUTE_NAME, + GROUPING_ATTRIBUTE_NAME + ); + + private static final Set defaultRelationships = Set.of(REL_SUCCESS, REL_FAILURE); + private static final Set rateExceededRelationships = Set.of(REL_SUCCESS, REL_FAILURE, REL_RATE_EXCEEDED); + private volatile Set relationships = defaultRelationships; private final ConcurrentMap dataThrottleMap = new ConcurrentHashMap<>(); private final ConcurrentMap countThrottleMap = new ConcurrentHashMap<>(); @@ -174,23 +250,6 @@ public class ControlRate extends AbstractProcessor { private volatile String groupingAttributeName = null; private volatile int timePeriodSeconds = 1; - @Override - protected void init(final ProcessorInitializationContext context) { - final List properties = new ArrayList<>(); - properties.add(RATE_CONTROL_CRITERIA); - properties.add(TIME_PERIOD); - properties.add(MAX_RATE); - properties.add(MAX_DATA_RATE); - properties.add(MAX_COUNT_RATE); - properties.add(RATE_CONTROL_ATTRIBUTE_NAME); - properties.add(GROUPING_ATTRIBUTE_NAME); - this.properties = Collections.unmodifiableList(properties); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } @Override protected List getSupportedPropertyDescriptors() { @@ -239,6 +298,14 @@ public class ControlRate extends AbstractProcessor { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); + if (descriptor.equals(RATE_EXCEEDED_STRATEGY)) { + if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(newValue)) { + this.relationships = rateExceededRelationships; + } else { + this.relationships = defaultRelationships; + } + } + if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) @@ -300,12 +367,63 @@ public class ControlRate extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis)); + final String strategy = context.getProperty(RATE_EXCEEDED_STRATEGY).getValue(); + if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(strategy)) { + routeFlowFilesExceedingRate(context, session); + } else { + holdFlowFilesExceedingRate(context, session); + } + } + + private void routeFlowFilesExceedingRate(final ProcessContext context, final ProcessSession session) { + clearExpiredThrottles(context); + + final List flowFiles = session.get(MAX_FLOW_FILES_PER_BATCH); if (flowFiles.isEmpty()) { context.yield(); return; } + final ThrottleFilter filter = new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis); + for (final FlowFile flowFile : flowFiles) { + final Relationship relationship; + if (!isRateAttributeValid(flowFile)) { + relationship = REL_FAILURE; + } else { + final FlowFileFilterResult result = filter.filter(flowFile); + relationship = result.isAccept() ? REL_SUCCESS : REL_RATE_EXCEEDED; + } + + session.transfer(flowFile, relationship); + getLogger().info("Routing {} to {}", flowFile, relationship.getName()); + session.getProvenanceReporter().route(flowFile, relationship); + } + } + + + private void holdFlowFilesExceedingRate(final ProcessContext context, final ProcessSession session) { + clearExpiredThrottles(context); + + final List flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis)); + if (flowFiles.isEmpty()) { + context.yield(); + return; + } + + final ComponentLog logger = getLogger(); + for (FlowFile flowFile : flowFiles) { + // call this to capture potential error + if (isRateAttributeValid(flowFile)) { + logger.info("transferring {} to 'success'", flowFile); + session.transfer(flowFile, REL_SUCCESS); + } else { + logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + } + + private void clearExpiredThrottles(final ProcessContext context) { // Periodically clear any Throttle that has not been used in more than 2 throttling periods final long lastClearTime = lastThrottleClearTime.get(); final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); @@ -334,18 +452,6 @@ public class ControlRate extends AbstractProcessor { } } } - - final ComponentLog logger = getLogger(); - for (FlowFile flowFile : flowFiles) { - // call this to capture potential error - if (isAccrualPossible(flowFile)) { - logger.info("transferring {} to 'success'", flowFile); - session.transfer(flowFile, REL_SUCCESS); - } else { - logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } } /** @@ -361,7 +467,7 @@ public class ControlRate extends AbstractProcessor { * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on * flowfile attribute, the specified attribute must be present and must be a long integer. */ - private boolean isAccrualPossible(FlowFile flowFile) { + private boolean isRateAttributeValid(FlowFile flowFile) { if (rateControlCriteria.equals(ATTRIBUTE_RATE)) { final String attributeValue = flowFile.getAttribute(rateControlAttribute); return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches(); @@ -500,7 +606,7 @@ public class ControlRate extends AbstractProcessor { @Override public FlowFileFilterResult filter(FlowFile flowFile) { - if (!isAccrualPossible(flowFile)) { + if (!isRateAttributeValid(flowFile)) { // this FlowFile is invalid for this configuration so let the processor deal with it return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index eb33a7c9df..76fbbfe377 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -16,19 +16,19 @@ */ package org.apache.nifi.processors.standard; -import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; -import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; +import static org.junit.jupiter.api.Assertions.assertEquals; public class TestControlRate { @@ -46,6 +46,35 @@ public class TestControlRate { runner = TestRunners.newTestRunner(controlRate); } + @Test + public void testRouteToRateExceeded() { + runner.setProperty(ControlRate.RATE_EXCEEDED_STRATEGY, ControlRate.ROUTE_TO_RATE_EXCEEDED.getValue()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "10"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 min"); + runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); + + for (int i = 0; i < 100; i++) { + final Map attributes = Collections.singletonMap("group", Integer.toString(i)); + runner.enqueue("", attributes); + } + + for (int i = 0; i < 25; i++) { + final Map attributes = Collections.singletonMap("group", "50"); + runner.enqueue("", attributes); + } + + runner.run(); + + // The first 100 should all go to success, as should the next 9 (as that's a total of 10 for group '50'). + runner.assertTransferCount(ControlRate.REL_SUCCESS, 109); + + // The rest should all go to 'rate exceeded' + runner.assertTransferCount(ControlRate.REL_RATE_EXCEEDED, 16); + final List exceededFlowFiles = runner.getFlowFilesForRelationship(ControlRate.REL_RATE_EXCEEDED); + exceededFlowFiles.forEach(ff -> ff.assertAttributeEquals("group", "50")); + } + @Test public void testLimitExceededThenOtherLimitNotExceeded() { // If we have flowfiles queued that have different values for the "Rate Controlled Attribute"