NIFI-627 removed flowfile penalization which could skew behavior when processor's Time Duration was less than Penalty Duration, improved over throttle penalization NIFI-990 corrected failure path NIFI-1329 refactored using FlowFileFilter to avoid repeatedly returning flowfiles to the input queue, producing misleading stats and excessive Tasks/Time used

Signed-off-by: joewitt <>
This commit is contained in:
Mike Moser 2016-03-10 17:53:25 -05:00 committed by joewitt
parent 7400b6f7c5
commit 0cb4047add
2 changed files with 265 additions and 121 deletions

View File

@ -37,6 +37,8 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -45,31 +47,41 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.timebuffer.EntityAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
@Tags({"rate control", "throttle", "rate", "throughput"})
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors." +
" If you configure a very small Time Duration, then the accuracy of the throttle gets worse." +
" You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.")
public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
"Rate is controlled by counting bytes transferred per time duration.");
public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
"Rate is controlled by counting flowfiles transferred per time duration");
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
@ -90,7 +102,7 @@ public class ControlRate extends AbstractProcessor {
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
.description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
.description("The amount of time to which the Maximum Rate pertains. Changing this value resets the rate counters.")
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("1 min")
@ -106,20 +118,26 @@ public class ControlRate extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All FlowFiles are transferred to this relationship")
.description("FlowFiles are transferred to this relationship under normal conditions")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
.description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format")
private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
private volatile String groupingAttributeName = null;
private volatile int timePeriodSeconds = 1;
protected void init(final ProcessorInitializationContext context) {
@ -133,6 +151,7 @@ public class ControlRate extends AbstractProcessor {
final Set<Relationship> relationships = new HashSet<>();
this.relationships = Collections.unmodifiableSet(relationships);
@ -191,7 +210,7 @@ public class ControlRate extends AbstractProcessor {
} else if (descriptor.equals(MAX_RATE)) {
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
@ -203,8 +222,24 @@ public class ControlRate extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(new ThrottleFilter());
if (flowFiles.size() == 0) {
// Periodically clear any Throttle that has not been used in more than 2 throttling periods
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
@ -226,17 +261,28 @@ public class ControlRate extends AbstractProcessor {
// TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
FlowFile flowFile = session.get();
if (flowFile == null) {
final ProcessorLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
try {
// call this to capture potential error
} catch (AssertionError e) {
logger.error("Routing {} to 'failure'", new Object[] { flowFile, e });
session.transfer(flowFile, REL_FAILURE);
final ProcessorLog logger = getLogger();
final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();"transferring {} to 'success'", new Object[] { flowFile });
session.transfer(flowFile, REL_SUCCESS);
* Determine the amount this FlowFile will incur against the maximum allowed rate.
private long getFlowFileAccrual(FlowFile flowFile) {
long rateValue;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
switch (rateControlCriteria) {
rateValue = flowFile.getSize();
@ -244,116 +290,37 @@ public class ControlRate extends AbstractProcessor {
rateValue = 1;
final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
final String attributeValue = flowFile.getAttribute(rateControlAttribute);
if (attributeValue == null) {
logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[] {flowFile, rateControlAttributeName});
session.transfer(flowFile, REL_FAILURE);
throw new AssertionError("FlowFile is missing required attribute " + rateControlAttribute);
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
new Object[] {flowFile, rateControlAttributeName, attributeValue});
session.transfer(flowFile, REL_FAILURE);
throw new AssertionError(String.format("FlowFile attribute [%1$s] has a value of [%2$s], which is not a positive long",
rateControlAttribute, attributeValue));
rateValue = Long.parseLong(attributeValue);
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
final String maxRateValue = context.getProperty(MAX_RATE).getValue();
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maxRateValue);
throttleMap.put(groupName, throttle);
try {
if (throttle.tryAdd(rateValue)) {"transferring {} to 'success'", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
} else {
flowFile = session.penalize(flowFile);
} finally {
private static class TimestampedLong {
private final Long value;
private final long timestamp = System.currentTimeMillis();
public TimestampedLong(final Long value) {
this.value = value;
public Long getValue() {
return value;
public long getTimestamp() {
return timestamp;
private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
if (oldValue == null && toAdd == null) {
return new TimestampedLong(0L);
} else if (oldValue == null) {
return toAdd;
} else if (toAdd == null) {
return oldValue;
return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
public TimestampedLong createNew() {
return new TimestampedLong(0L);
public long getTimestamp(TimestampedLong entity) {
return entity == null ? 0L : entity.getTimestamp();
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
return rateValue;
private static class Throttle extends ReentrantLock {
private final AtomicLong maxRate = new AtomicLong(1L);
private final long timePeriodValue;
private final TimeUnit timePeriodUnit;
private final long timePeriodMillis;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ProcessorLog logger;
private volatile long penalizationExpired;
private volatile long penalizationPeriod = 0;
private volatile long penalizationExpired = 0;
private volatile long lastUpdateTime;
public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
this.timePeriodUnit = unit;
this.timePeriodValue = timePeriod;
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess());
this.logger = logger;
@ -373,28 +340,84 @@ public class ControlRate extends AbstractProcessor {
final long maxRateValue = maxRate.get();
final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis);
if (sum != null && sum.getValue() >= maxRateValue) {
logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[] {sum.getValue(), value});
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[] {sum.getValue(), sum.getTimestamp(), value});
return false;
logger.debug("current sum for throttle is {}, so allowing rate of {} through",
new Object[] {sum == null ? 0 : sum.getValue(), value});
// Implement the Throttle penalization based on how much extra 'amountOver' was allowed through
if (penalizationPeriod > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[] {penalizationPeriod});
penalizationExpired = now + penalizationPeriod;
penalizationPeriod = 0;
return false;
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through",
new Object[] {sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value});
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
if (transferred > maxRateValue) {
final long amountOver = transferred - maxRateValue;
// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
final double pct = (double) amountOver / (double) maxRateValue;
final long penalizationPeriod = (long) (milliDuration * pct);
this.penalizationExpired = now + penalizationPeriod;
this.penalizationPeriod = (long) (timePeriodMillis * pct);
if (logger.isDebugEnabled()) {
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[] {value, penalizationPeriod});
lastUpdateTime = now;
return true;
private class ThrottleFilter implements FlowFileFilter {
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = 1;
try {
accrual = getFlowFileAccrual(flowFile);
} catch (AssertionError e) {
// this FlowFile contains an error, so let the processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maximumRateStr);
throttleMap.put(groupName, throttle);
try {
if (throttle.tryAdd(accrual)) {
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} finally {
return FlowFileFilterResult.REJECT_AND_TERMINATE;

View File

@ -26,6 +26,96 @@ import org.junit.Test;
public class TestControlRate {
public void testFileCountRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.enqueue("test data 4");, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
runner.clearTransferState();, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
// we have sent 3 files and after 1 second, we should be able to send the 4th
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
public void testFileCountWithGrouping() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
runner.clearTransferState();, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
public void testDataSizeRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "20 b");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("testdata 1");
runner.enqueue("testdata 2");
runner.enqueue("testdata 3");
runner.enqueue("testdata 4");, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.clearTransferState();, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
Thread.sleep(1100L);, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
public void testViaAttribute() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
@ -35,33 +125,64 @@ public class TestControlRate {
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
createFlowFile(runner, 1000);
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
createFlowFile(runner, 5000);
createFlowFile(runner, 20000);
createFlowFile(runner, 1000);;, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
// at this point, we have sent through 29,000 but our max is 20,000 per second.
// After 1.45 seconds (29000 / 20000), we should be able to send another 20,000, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
// at this point, we have sent through 27,000 but our max is 20,000 per second.
// After 1 second, we should be able to send another 13,000
// at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
// at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000;
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
public void testBadAttributeRate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", "bad string");
runner.enqueue(new byte[0], attributeMap);;
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 1);
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));
runner.enqueue(new byte[0], attributeMap);
private void createFlowFileWithGroup(final TestRunner runner, final String group) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("group", group);
runner.enqueue(new byte[0], attributeMap);