NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

NIFI-10243: fix typos

NIFI-10243: re-ordered property in ControlRate

NIFI-10243: minor updates to make code cleaner based on PR comments

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6506.
This commit is contained in:
Mark Bean 2022-10-10 15:00:25 -04:00 committed by Nathan Gough
parent e8774979de
commit 2bfefc3e5b
3 changed files with 546 additions and 107 deletions

View File

@ -16,21 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -42,7 +27,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -59,6 +43,21 @@ import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@SideEffectFree
@TriggerSerially
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -71,30 +70,58 @@ public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
"Rate is controlled by counting bytes transferred per time duration.");
public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
"Rate is controlled by counting flowfiles transferred per time duration");
"Rate is controlled by counting FlowFiles transferred per time duration");
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
"Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
private static final long DEFAULT_ACCRUAL_COUNT = -1L;
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.displayName("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
.required(true)
.allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
.allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
.defaultValue(DATA_RATE)
.build();
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
.name("Maximum Rate")
.displayName("Maximum Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
.required(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
.dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
.build();
public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
.name("Maximum Data Rate")
.displayName("Maximum Data Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "Data Size (such as '1 MB') representing bytes per Time Duration.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
.build();
public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
.name("Maximum FlowFile Rate")
.displayName("Maximum FlowFile Rate")
.description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+ "positive integer representing FlowFiles count per Time Duration")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
.build();
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
@ -103,6 +130,7 @@ public class ControlRate extends AbstractProcessor {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.dependsOn(RATE_CONTROL_CRITERIA, ATTRIBUTE_RATE)
.build();
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
@ -135,11 +163,13 @@ public class ControlRate extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
private volatile String maximumCountRateStr = null;
private volatile String groupingAttributeName = null;
private volatile int timePeriodSeconds = 1;
@ -147,9 +177,11 @@ public class ControlRate extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RATE_CONTROL_CRITERIA);
properties.add(MAX_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
properties.add(TIME_PERIOD);
properties.add(MAX_RATE);
properties.add(MAX_DATA_RATE);
properties.add(MAX_COUNT_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
properties.add(GROUPING_ATTRIBUTE_NAME);
this.properties = Collections.unmodifiableList(properties);
@ -173,30 +205,30 @@ public class ControlRate extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
final Validator rateValidator;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
case DATA_OR_FLOWFILE_RATE:
// enforce validators to be sure properties are configured; they are only required for DATA_OR_FLOWFILE_RATE criteria
validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate(MAX_DATA_RATE.getDisplayName(), context.getProperty(MAX_DATA_RATE).getValue(), context));
validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate(MAX_COUNT_RATE.getDisplayName(), context.getProperty(MAX_COUNT_RATE).getValue(), context));
break;
case DATA_RATE:
validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
break;
case ATTRIBUTE_RATE:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
.explanation("property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
break;
case FLOWFILE_RATE:
default:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
break;
default:
// no custom validation required
break;
}
final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
if (!rateResult.isValid()) {
validationResults.add(rateResult);
}
return validationResults;
@ -211,16 +243,37 @@ public class ControlRate extends AbstractProcessor {
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
|| descriptor.equals(TIME_PERIOD)) {
// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
throttleMap.clear();
} else if (descriptor.equals(MAX_RATE)) {
dataThrottleMap.clear();
countThrottleMap.clear();
} else if (descriptor.equals(MAX_RATE) || descriptor.equals(MAX_DATA_RATE)) {
// MAX_RATE could affect either throttle map; MAX_DATA_RATE only affects data throttle map
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
if (newValue != null) {
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
}
if (dataThrottleRequired()) {
for (final Throttle throttle : dataThrottleMap.values()) {
throttle.setMaxRate(newRate);
}
}
if (countThrottleRequired()) {
for (final Throttle throttle : countThrottleMap.values()) {
throttle.setMaxRate(newRate);
}
}
}
for (final Throttle throttle : throttleMap.values()) {
} else if (descriptor.equals(MAX_COUNT_RATE)) {
// MAX_COUNT_RATE only affects count throttle map
long newRate;
try {
newRate = Long.parseLong(newValue);
} catch (NumberFormatException nfe) {
newRate = -1;
}
for (final Throttle throttle : countThrottleMap.values()) {
throttle.setMaxRate(newRate);
}
}
@ -230,7 +283,16 @@ public class ControlRate extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
if (dataThrottleRequired()) {
// Use MAX_DATA_RATE only for DATA_OR_FLOWFILE_RATE criteria
maximumRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
? context.getProperty(MAX_DATA_RATE).getValue().toUpperCase() : context.getProperty(MAX_RATE).getValue().toUpperCase();
}
if (countThrottleRequired()) {
// Use MAX_COUNT_RATE only for DATA_OR_FLOWFILE_RATE criteria
maximumCountRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
? context.getProperty(MAX_COUNT_RATE).getValue() : context.getProperty(MAX_RATE).getValue();
}
groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
}
@ -248,7 +310,14 @@ public class ControlRate extends AbstractProcessor {
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
if (dataThrottleRequired()) {
throttleSet.addAll(dataThrottleMap.entrySet());
}
if (countThrottleRequired()) {
throttleSet.addAll(countThrottleMap.entrySet());
}
final Iterator<Map.Entry<String, Throttle>> itr = throttleSet.iterator();
while (itr.hasNext()) {
final Map.Entry<String, Throttle> entry = itr.next();
final Throttle throttle = entry.getValue();
@ -268,48 +337,67 @@ public class ControlRate extends AbstractProcessor {
final ComponentLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
// call this to capture potential error
final long accrualAmount = getFlowFileAccrual(flowFile);
if (accrualAmount < 0) {
logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
if (!isAccrualPossible(flowFile)) {
logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
logger.info("transferring {} to 'success'", new Object[]{flowFile});
logger.info("transferring {} to 'success'", flowFile);
session.transfer(flowFile, REL_SUCCESS);
}
}
}
/*
* Determine the amount this FlowFile will incur against the maximum allowed rate.
* If the value returned is negative then the flowfile given is missing the required attribute
* or the attribute has an invalid value for accrual.
* Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
* flowfile attribute, the specified attribute must be present and must be a long integer.
*/
private long getFlowFileAccrual(FlowFile flowFile) {
long rateValue;
switch (rateControlCriteria) {
case DATA_RATE:
rateValue = flowFile.getSize();
break;
case FLOWFILE_RATE:
rateValue = 1;
break;
case ATTRIBUTE_RATE:
final String attributeValue = flowFile.getAttribute(rateControlAttribute);
if (attributeValue == null) {
return -1L;
}
private boolean isAccrualPossible(FlowFile flowFile) {
if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
final String attributeValue = flowFile.getAttribute(rateControlAttribute);
return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
}
return true;
}
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
return -1L;
}
rateValue = Long.parseLong(attributeValue);
break;
default:
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
/*
* Determine the amount this FlowFile will incur against the maximum allowed rate.
* This is applicable to data size accrual only
*/
private long getDataSizeAccrual(FlowFile flowFile) {
return flowFile.getSize();
}
/*
* Determine the amount this FlowFile will incur against the maximum allowed rate.
* This is applicable to counting accruals, flowfiles or attributes
*/
private long getCountAccrual(FlowFile flowFile) {
long rateValue = DEFAULT_ACCRUAL_COUNT;
if (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)) {
rateValue = 1;
}
if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
final String attributeValue = flowFile.getAttribute(rateControlAttribute);
if (attributeValue == null) {
return DEFAULT_ACCRUAL_COUNT;
}
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
return DEFAULT_ACCRUAL_COUNT;
}
rateValue = Long.parseLong(attributeValue);
}
return rateValue;
}
private boolean dataThrottleRequired() {
return rateControlCriteria != null && (rateControlCriteria.equals(DATA_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
}
private boolean countThrottleRequired() {
return rateControlCriteria != null && (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(ATTRIBUTE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
}
private static class Throttle extends ReentrantLock {
private final AtomicLong maxRate = new AtomicLong(1L);
@ -336,6 +424,10 @@ public class ControlRate extends AbstractProcessor {
}
public boolean tryAdd(final long value) {
// value should never be negative, but if it is return immediately
if (value < 0) {
return false;
}
final long now = System.currentTimeMillis();
if (penalizationExpired > now) {
return false;
@ -346,7 +438,7 @@ public class ControlRate extends AbstractProcessor {
final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis);
if (sum != null && sum.getValue() >= maxRateValue) {
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", sum.getValue(), sum.getTimestamp(), value);
}
return false;
}
@ -354,7 +446,7 @@ public class ControlRate extends AbstractProcessor {
// Implement the Throttle penalization based on how much extra 'amountOver' was allowed through
if (penalizationPeriod > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{penalizationPeriod});
logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", penalizationPeriod);
}
penalizationExpired = now + penalizationPeriod;
penalizationPeriod = 0;
@ -363,7 +455,7 @@ public class ControlRate extends AbstractProcessor {
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through",
new Object[]{sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value});
sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value);
}
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
@ -374,7 +466,7 @@ public class ControlRate extends AbstractProcessor {
this.penalizationPeriod = (long) (timePeriodMillis * pct);
if (logger.isDebugEnabled()) {
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", value, penalizationPeriod);
}
}
@ -394,8 +486,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile);
if (accrual < 0) {
if (!isAccrualPossible(flowFile)) {
// this FlowFile is invalid for this configuration so let the processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
@ -408,34 +499,59 @@ public class ControlRate extends AbstractProcessor {
groupName = DEFAULT_GROUP_ATTRIBUTE;
}
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
Throttle dataThrottle = dataThrottleMap.get(groupName);
Throttle countThrottle = countThrottleMap.get(groupName);
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maximumRateStr);
boolean dataThrottlingActive = false;
if (dataThrottleRequired()) {
if (dataThrottle == null) {
dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
dataThrottleMap.put(groupName, dataThrottle);
}
throttle.setMaxRate(newRate);
throttleMap.put(groupName, throttle);
dataThrottle.lock();
try {
if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
flowFilesInBatch++;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
// only accept flowfile if additional count throttle does not need to run
if (!countThrottleRequired()) {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
}
} else {
dataThrottlingActive = true;
}
} finally {
dataThrottle.unlock();
}
}
throttle.lock();
try {
if (throttle.tryAdd(accrual)) {
flowFilesInBatch += 1;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
// continue processing count throttle only if required and if data throttle is not already limiting flowfiles
if (countThrottleRequired() && !dataThrottlingActive) {
if (countThrottle == null) {
countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
countThrottleMap.put(groupName, countThrottle);
}
countThrottle.lock();
try {
if (countThrottle.tryAdd(getCountAccrual(flowFile))) {
flowFilesInBatch++;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
}
} finally {
countThrottle.unlock();
}
} finally {
throttle.unlock();
}
// If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can

View File

@ -0,0 +1,64 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ControlRate</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>This processor throttles throughput of FlowFiles based on a configured rate. The rate can be specified as either a direct data rate (bytes per time period), or by
counting FlowFiles or a specific attribute value. In all cases, the time period for measurement is specified in the Time Duration property.
</p>
<p>The processor operates in one of four available modes. The mode is determined by the Rate Control Criteria property.
</p>
<p>
<table>
<tr>
<th>Mode</th>
<th>Description</th>
</tr>
<tr>
<td>Data Rate</td>
<td>The FlowFile content size is accumulated for all FlowFiles passing through this processor. FlowFiles are throttled to ensure a maximum overall data rate (bytes per time period)
is not exceeded. The Maximum Rate property specifies the maximum bytes allowed per Time Duration.</td>
</tr>
<tr>
<td>FlowFile Count</td>
<td>FlowFiles are counted regardless of content size. No more than the specified number of FlowFiles pass through this processor in the given Time Duration. The Maximum Rate property
specifies the maximum number of FlowFiles allowed per Time Duration.</td>
</tr>
<tr>
<td>Attribute Value</td>
<td>The value of an attribute is accumulated to determine overall rate. The Rate Controlled Attribute property specifies the attribute whose value will be accumulated. The value of
the specified attribute is expected to be an integer. This mode is independent of overall FlowFile size and count.</td>
</tr>
<tr>
<td>Data Rate or FlowFile Count</td>
<td>This mode provides a combination of Data Rate and FlowFile Count. Both rates are accumulated and FlowFiles are throttled if either rate is exceeded. Both Maximum Data Rate and
Maximum FlowFile Rate properties must be specified to determine content size and FlowFile count per Time Duration.</td>
</tr>
</table>
</p>
<p>If the Grouping Attribute property is specified, all rates are accumulated separately for unique values of the specified attribute. For example, assume Grouping Attribute property is
specified and the its value is "city". All FlowFiles containing a "city" attribute with value "Albuquerque" will have an accumulated rate calculated. A separate rate will be calculated
for all FlowFiles containing a "city" attribute with a value "Boston". In other words, separate rate calculations will be accumulated for all unique values of the Grouping Attribute.
</p>
</body>
</html>

View File

@ -31,6 +31,8 @@ import org.junit.jupiter.api.Test;
public class TestControlRate {
private static final long ONE_SEC_PLUS = 1010L;
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
@ -84,7 +86,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 3 files and after 1 second, we should be able to send the 4th
Thread.sleep(1100L);
Thread.sleep(ONE_SEC_PLUS);
runner.run();
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
@ -116,7 +118,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
Thread.sleep(1100L);
Thread.sleep(ONE_SEC_PLUS);
runner.run(2);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
@ -145,7 +147,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
Thread.sleep(1100L);
Thread.sleep(ONE_SEC_PLUS);
runner.run(2, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
@ -192,6 +194,28 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
@Test
public void testAttributeDoesNotExist() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
createFlowFile(runner, 5000);
createFlowFile(runner, 20000);
createFlowFile(runner, 1000);
runner.run(5, false);
// all flowfiles transfer to failure since throttling attribute is not present
runner.assertAllFlowFilesTransferred(ControlRate.REL_FAILURE, 5);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertQueueEmpty();
}
@Test
public void testBadAttributeRate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
@ -211,7 +235,7 @@ public class TestControlRate {
}
@Test
public void testBatchLimit() throws InterruptedException {
public void testBatchLimit() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
@ -240,7 +264,7 @@ public class TestControlRate {
}
@Test
public void testNonExistingGroupAttribute() throws InterruptedException {
public void testNonExistingGroupAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
@ -258,10 +282,245 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
@Test
public void testIncreaseDataRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "11 B");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.enqueue("test data 4");
runner.enqueue("test data 5");
runner.enqueue("test data 6");
runner.run(7, true);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// Increase rate after stopping processor. Previous count should remain since we are still inside time period
runner.setProperty(ControlRate.MAX_RATE, "33 B");
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
Thread.sleep(ONE_SEC_PLUS);
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testIncreaseFlowFileRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.enqueue("test data 4");
runner.enqueue("test data 5");
runner.enqueue("test data 6");
runner.run(7, true);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// Increase rate after stopping processor. Previous count should remain since we are still inside time period
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
Thread.sleep(ONE_SEC_PLUS);
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
runner.clearTransferState();
runner.run(4, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 22 bytes and after 1 second, we should be able to send 22 more
Thread.sleep(ONE_SEC_PLUS);
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
// FlowFile count rate will throttle before data rate
runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.run(4, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 1 flowfile and after 1 second, we should be able to send 1 more
Thread.sleep(ONE_SEC_PLUS);
runner.run(4, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 2 flowfile and after 1 second, we should be able to send 1 more
Thread.sleep(ONE_SEC_PLUS);
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.enqueue("4");
runner.enqueue("5");
runner.enqueue("6");
runner.enqueue("7");
runner.enqueue("8");
runner.run(10, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
runner.clearTransferState();
// we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
Thread.sleep(ONE_SEC_PLUS);
runner.run(10, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
runner.clearTransferState();
// after 1 second, we should be able to send the remaining flowfile
Thread.sleep(ONE_SEC_PLUS);
runner.run(10, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testValidate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.assertNotValid(); // MAX_RATE is not set
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.assertNotValid(); // MAX_RATE is not a byte size
runner.setProperty(ControlRate.MAX_RATE, "1 MB");
runner.assertValid();
runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
runner.assertValid(); // MAX_DATA_RATE is ignored
runner.removeProperty(ControlRate.MAX_RATE);
runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
runner.clearProperties();
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.assertNotValid(); // MAX_RATE is not set
runner.setProperty(ControlRate.MAX_RATE, "1 MB");
runner.assertNotValid(); // MAX_RATE is not an integer
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.assertValid();
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
runner.assertValid(); // MAX_COUNT_RATE is ignored
runner.removeProperty(ControlRate.MAX_RATE);
runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
runner.clearProperties();
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.assertNotValid(); // MAX_RATE is not set
runner.setProperty(ControlRate.MAX_RATE, "1 MB");
runner.assertNotValid(); // MAX_RATE is not an integer
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.assertValid();
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
runner.assertValid(); // MAX_COUNT_RATE is ignored
runner.removeProperty(ControlRate.MAX_RATE);
runner.assertNotValid();// MAX_RATE is a required property for this rate control criteria
runner.setProperty(ControlRate.MAX_RATE, "1");
runner.removeProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME);
runner.assertNotValid();// RATE_CONTROL_ATTRIBUTE_NAME is a required property for this rate control criteria
runner.clearProperties();
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "2");
runner.assertValid(); // both MAX_DATA_RATE and MAX_COUNT_RATE are set
runner.removeProperty(ControlRate.MAX_COUNT_RATE);
runner.assertNotValid(); // MAX_COUNT_RATE is not set
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
runner.removeProperty(ControlRate.MAX_DATA_RATE);
runner.assertNotValid();// MAX_DATA_RATE is not set
runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
runner.setProperty(ControlRate.MAX_RATE, "1 MB");
runner.assertValid(); // MAX_RATE is ignored
}
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));
runner.enqueue(new byte[0], attributeMap);
byte[] data = "0123456789".getBytes();
runner.enqueue(data, attributeMap);
}
private void createFlowFileWithGroup(final TestRunner runner, final String group) {
final Map<String, String> attributeMap = new HashMap<>();