NIFI-810: Addressed several checkstyle violations

This commit is contained in:
Mark Payne 2015-10-07 17:48:51 -04:00
parent b974445ddd
commit ccfb57fe9f
6 changed files with 489 additions and 475 deletions

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.behavior;
import java.lang.annotation.Documented;
@ -21,31 +37,31 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface InputRequirement {
Requirement value();
public static enum Requirement {
/**
* This value is used to indicate that the Processor requires input from other Processors
* in order to run. As a result, the Processor will not be valid if it does not have any
* incoming connections.
*/
INPUT_REQUIRED,
/**
* This value is used to indicate that the Processor will consume data from an incoming
* connection but does not require an incoming connection in order to perform its task.
* If the {@link InputRequirement} annotation is not present, this is the default value
* that is used.
*/
INPUT_ALLOWED,
/**
* This value is used to indicate that the Processor is a "Source Processor" and does
* not accept incoming connections. Because the Processor does not pull FlowFiles from
* an incoming connection, it can be very confusing for users who create incoming connections
* to the Processor. As a result, this value can be used in order to clarify that incoming
* connections will not be used. This prevents the user from even creating such a connection.
*/
INPUT_FORBIDDEN;
}
Requirement value();
public static enum Requirement {
/**
* This value is used to indicate that the Processor requires input from other Processors
* in order to run. As a result, the Processor will not be valid if it does not have any
* incoming connections.
*/
INPUT_REQUIRED,
/**
* This value is used to indicate that the Processor will consume data from an incoming
* connection but does not require an incoming connection in order to perform its task.
* If the {@link InputRequirement} annotation is not present, this is the default value
* that is used.
*/
INPUT_ALLOWED,
/**
* This value is used to indicate that the Processor is a "Source Processor" and does
* not accept incoming connections. Because the Processor does not pull FlowFiles from
* an incoming connection, it can be very confusing for users who create incoming connections
* to the Processor. As a result, this value can be used in order to clarify that incoming
* connections will not be used. This prevents the user from even creating such a connection.
*/
INPUT_FORBIDDEN;
}
}

View File

@ -59,10 +59,8 @@ import com.amazonaws.services.s3.model.StorageClass;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
value = "The value of a User-Defined Metadata field to add to the S3 Object",
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
supportsExpressionLanguage = true)
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@ -72,22 +70,22 @@ import com.amazonaws.services.s3.model.StorageClass;
public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Expiration Time Rule")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name())
.build();
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name())
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -97,15 +95,15 @@ public class PutS3Object extends AbstractS3Processor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@ -176,9 +174,9 @@ public class PutS3Object extends AbstractS3Processor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -31,72 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
public ProcessorNode(final Processor processor, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(processor, id, validationContextFactory, serviceProvider);
}
public ProcessorNode(final Processor processor, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(processor, id, validationContextFactory, serviceProvider);
}
public abstract boolean isIsolated();
public abstract boolean isIsolated();
public abstract boolean isTriggerWhenAnyDestinationAvailable();
public abstract boolean isTriggerWhenAnyDestinationAvailable();
@Override
public abstract boolean isSideEffectFree();
@Override
public abstract boolean isSideEffectFree();
public abstract boolean isTriggeredSerially();
public abstract boolean isTriggeredSerially();
public abstract boolean isEventDrivenSupported();
public abstract boolean isEventDrivenSupported();
public abstract boolean isHighThroughputSupported();
public abstract boolean isHighThroughputSupported();
public abstract Requirement getInputRequirement();
public abstract Requirement getInputRequirement();
@Override
public abstract boolean isValid();
@Override
public abstract boolean isValid();
public abstract void setScheduledState(ScheduledState scheduledState);
public abstract void setScheduledState(ScheduledState scheduledState);
public abstract void setBulletinLevel(LogLevel bulletinLevel);
public abstract void setBulletinLevel(LogLevel bulletinLevel);
public abstract LogLevel getBulletinLevel();
public abstract LogLevel getBulletinLevel();
public abstract Processor getProcessor();
public abstract Processor getProcessor();
public abstract void yield(long period, TimeUnit timeUnit);
public abstract void yield(long period, TimeUnit timeUnit);
public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
public abstract Set<Relationship> getAutoTerminatedRelationships();
public abstract Set<Relationship> getAutoTerminatedRelationships();
public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
@Override
public abstract SchedulingStrategy getSchedulingStrategy();
@Override
public abstract SchedulingStrategy getSchedulingStrategy();
public abstract void setRunDuration(long duration, TimeUnit timeUnit);
public abstract void setRunDuration(long duration, TimeUnit timeUnit);
public abstract long getRunDuration(TimeUnit timeUnit);
public abstract long getRunDuration(TimeUnit timeUnit);
public abstract Map<String, String> getStyle();
public abstract Map<String, String> getStyle();
public abstract void setStyle(Map<String, String> style);
public abstract void setStyle(Map<String, String> style);
/**
* @return the number of threads (concurrent tasks) currently being used by
* this Processor
*/
public abstract int getActiveThreadCount();
/**
* @return the number of threads (concurrent tasks) currently being used by
* this Processor
*/
public abstract int getActiveThreadCount();
/**
* Verifies that this Processor can be started if the provided set of
* services are enabled. This is introduced because we need to verify that
* all components can be started before starting any of them. In order to do
* that, we need to know that this component can be started if the given
* services are enabled, as we will then enable the given services before
* starting this component.
*
* @param ignoredReferences to ignore
*/
public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
/**
* Verifies that this Processor can be started if the provided set of
* services are enabled. This is introduced because we need to verify that
* all components can be started before starting any of them. In order to do
* that, we need to know that this component can be started if the given
* services are enabled, as we will then enable the given services before
* starting this component.
*
* @param ignoredReferences to ignore
*/
public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}

View File

@ -1306,9 +1306,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
@Override
public void verifyModifiable() throws IllegalStateException {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
}>>>>>>>2215 bc848b7db395b2ca9ac7cc4dc10891393721
public void verifyModifiable() throws IllegalStateException {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
}
}

View File

@ -56,99 +56,99 @@ import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class Base64EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";
public static final String DECODE_MODE = "Decode";
public static final String ENCODE_MODE = "Encode";
public static final String DECODE_MODE = "Decode";
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Specifies whether the content should be encoded or decoded")
.required(true)
.allowableValues(ENCODE_MODE, DECODE_MODE)
.defaultValue(ENCODE_MODE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully encoded or decoded will be routed to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
.build();
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Specifies whether the content should be encoded or decoded")
.required(true)
.allowableValues(ENCODE_MODE, DECODE_MODE)
.defaultValue(ENCODE_MODE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully encoded or decoded will be routed to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(MODE);
this.properties = Collections.unmodifiableList(properties);
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(MODE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final ProcessorLog logger = getLogger();
boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
try {
final StopWatch stopWatch = new StopWatch(true);
if (encode) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (Base64OutputStream bos = new Base64OutputStream(out)) {
int len = -1;
byte[] buf = new byte[8192];
while ((len = in.read(buf)) > 0) {
bos.write(buf, 0, len);
}
bos.flush();
}
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
int len = -1;
byte[] buf = new byte[8192];
while ((len = bis.read(buf)) > 0) {
out.write(buf, 0, len);
}
out.flush();
}
}
});
}
boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
try {
final StopWatch stopWatch = new StopWatch(true);
if (encode) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (Base64OutputStream bos = new Base64OutputStream(out)) {
int len = -1;
byte[] buf = new byte[8192];
while ((len = in.read(buf)) > 0) {
bos.write(buf, 0, len);
}
bos.flush();
}
}
});
} else {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
int len = -1;
byte[] buf = new byte[8192];
while ((len = bis.read(buf)) > 0) {
out.write(buf, 0, len);
}
out.flush();
}
}
});
}
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
logger.info("Successfully {} {}", new Object[] {encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[] {encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -61,340 +61,340 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
.required(true)
.allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
.defaultValue(DATA_RATE)
.build();
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
.name("Maximum Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
.build();
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+ "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+ "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
.description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
.required(true)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("1 min")
.build();
public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Grouping Attribute")
.description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+ "each value specified by the attribute with this name. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
.required(true)
.allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
.defaultValue(DATA_RATE)
.build();
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
.name("Maximum Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
.build();
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+ "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+ "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
.description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
.required(true)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("1 min")
.build();
public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Grouping Attribute")
.description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+ "each value specified by the attribute with this name. Changing this value resets the rate counters.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
.build();
private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RATE_CONTROL_CRITERIA);
properties.add(MAX_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
properties.add(TIME_PERIOD);
properties.add(GROUPING_ATTRIBUTE_NAME);
this.properties = Collections.unmodifiableList(properties);
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RATE_CONTROL_CRITERIA);
properties.add(MAX_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
properties.add(TIME_PERIOD);
properties.add(GROUPING_ATTRIBUTE_NAME);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
final Validator rateValidator;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
break;
case ATTRIBUTE_RATE:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
break;
case FLOWFILE_RATE:
default:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
break;
}
final Validator rateValidator;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
break;
case ATTRIBUTE_RATE:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
break;
case FLOWFILE_RATE:
default:
rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
break;
}
final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
if (!rateResult.isValid()) {
validationResults.add(rateResult);
}
final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
if (!rateResult.isValid()) {
validationResults.add(rateResult);
}
return validationResults;
}
return validationResults;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (descriptor.equals(RATE_CONTROL_CRITERIA)
|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
|| descriptor.equals(TIME_PERIOD)) {
// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
throttleMap.clear();
} else if (descriptor.equals(MAX_RATE)) {
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
}
if (descriptor.equals(RATE_CONTROL_CRITERIA)
|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
|| descriptor.equals(TIME_PERIOD)) {
// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
throttleMap.clear();
} else if (descriptor.equals(MAX_RATE)) {
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(newValue);
}
for (final Throttle throttle : throttleMap.values()) {
throttle.setMaxRate(newRate);
}
}
}
for (final Throttle throttle : throttleMap.values()) {
throttle.setMaxRate(newRate);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<String, Throttle> entry = itr.next();
final Throttle throttle = entry.getValue();
if (throttle.tryLock()) {
try {
if (throttle.lastUpdateTime() < lastClearTime) {
itr.remove();
}
} finally {
throttle.unlock();
}
}
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<String, Throttle> entry = itr.next();
final Throttle throttle = entry.getValue();
if (throttle.tryLock()) {
try {
if (throttle.lastUpdateTime() < lastClearTime) {
itr.remove();
}
} finally {
throttle.unlock();
}
}
}
}
}
// TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
long rateValue;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValue = flowFile.getSize();
break;
case FLOWFILE_RATE:
rateValue = 1;
break;
case ATTRIBUTE_RATE:
final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
if (attributeValue == null) {
logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
session.transfer(flowFile, REL_FAILURE);
return;
}
final ProcessorLog logger = getLogger();
final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
long rateValue;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
case DATA_RATE:
rateValue = flowFile.getSize();
break;
case FLOWFILE_RATE:
rateValue = 1;
break;
case ATTRIBUTE_RATE:
final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
if (attributeValue == null) {
logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[] {flowFile, rateControlAttributeName});
session.transfer(flowFile, REL_FAILURE);
return;
}
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
new Object[]{flowFile, rateControlAttributeName, attributeValue});
session.transfer(flowFile, REL_FAILURE);
return;
}
rateValue = Long.parseLong(attributeValue);
break;
default:
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
}
if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
new Object[] {flowFile, rateControlAttributeName, attributeValue});
session.transfer(flowFile, REL_FAILURE);
return;
}
rateValue = Long.parseLong(attributeValue);
break;
default:
throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
}
final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
Throttle throttle = throttleMap.get(groupName);
if (throttle == null) {
throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
final String maxRateValue = context.getProperty(MAX_RATE).getValue();
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maxRateValue);
}
throttle.setMaxRate(newRate);
final String maxRateValue = context.getProperty(MAX_RATE).getValue();
final long newRate;
if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
} else {
newRate = Long.parseLong(maxRateValue);
}
throttle.setMaxRate(newRate);
throttleMap.put(groupName, throttle);
}
throttleMap.put(groupName, throttle);
}
throttle.lock();
try {
if (throttle.tryAdd(rateValue)) {
logger.info("transferring {} to 'success'", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
} else {
flowFile = session.penalize(flowFile);
session.transfer(flowFile);
}
} finally {
throttle.unlock();
}
}
throttle.lock();
try {
if (throttle.tryAdd(rateValue)) {
logger.info("transferring {} to 'success'", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
} else {
flowFile = session.penalize(flowFile);
session.transfer(flowFile);
}
} finally {
throttle.unlock();
}
}
private static class TimestampedLong {
private static class TimestampedLong {
private final Long value;
private final long timestamp = System.currentTimeMillis();
private final Long value;
private final long timestamp = System.currentTimeMillis();
public TimestampedLong(final Long value) {
this.value = value;
}
public TimestampedLong(final Long value) {
this.value = value;
}
public Long getValue() {
return value;
}
public Long getValue() {
return value;
}
public long getTimestamp() {
return timestamp;
}
}
public long getTimestamp() {
return timestamp;
}
}
private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
@Override
public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
if (oldValue == null && toAdd == null) {
return new TimestampedLong(0L);
} else if (oldValue == null) {
return toAdd;
} else if (toAdd == null) {
return oldValue;
}
@Override
public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
if (oldValue == null && toAdd == null) {
return new TimestampedLong(0L);
} else if (oldValue == null) {
return toAdd;
} else if (toAdd == null) {
return oldValue;
}
return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
}
return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
}
@Override
public TimestampedLong createNew() {
return new TimestampedLong(0L);
}
@Override
public TimestampedLong createNew() {
return new TimestampedLong(0L);
}
@Override
public long getTimestamp(TimestampedLong entity) {
return entity == null ? 0L : entity.getTimestamp();
}
}
@Override
public long getTimestamp(TimestampedLong entity) {
return entity == null ? 0L : entity.getTimestamp();
}
}
private static class Throttle extends ReentrantLock {
private static class Throttle extends ReentrantLock {
private final AtomicLong maxRate = new AtomicLong(1L);
private final long timePeriodValue;
private final TimeUnit timePeriodUnit;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ProcessorLog logger;
private final AtomicLong maxRate = new AtomicLong(1L);
private final long timePeriodValue;
private final TimeUnit timePeriodUnit;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ProcessorLog logger;
private volatile long penalizationExpired;
private volatile long lastUpdateTime;
private volatile long penalizationExpired;
private volatile long lastUpdateTime;
public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
this.timePeriodUnit = unit;
this.timePeriodValue = timePeriod;
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
this.logger = logger;
}
public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
this.timePeriodUnit = unit;
this.timePeriodValue = timePeriod;
this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
this.logger = logger;
}
public void setMaxRate(final long maxRate) {
this.maxRate.set(maxRate);
}
public void setMaxRate(final long maxRate) {
this.maxRate.set(maxRate);
}
public long lastUpdateTime() {
return lastUpdateTime;
}
public long lastUpdateTime() {
return lastUpdateTime;
}
public boolean tryAdd(final long value) {
final long now = System.currentTimeMillis();
if (penalizationExpired > now) {
return false;
}
public boolean tryAdd(final long value) {
final long now = System.currentTimeMillis();
if (penalizationExpired > now) {
return false;
}
final long maxRateValue = maxRate.get();
final long maxRateValue = maxRate.get();
final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
if (sum != null && sum.getValue() >= maxRateValue) {
logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
return false;
}
final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
if (sum != null && sum.getValue() >= maxRateValue) {
logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[] {sum.getValue(), value});
return false;
}
logger.debug("current sum for throttle is {}, so allowing rate of {} through",
new Object[]{sum == null ? 0 : sum.getValue(), value});
logger.debug("current sum for throttle is {}, so allowing rate of {} through",
new Object[] {sum == null ? 0 : sum.getValue(), value});
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
if (transferred > maxRateValue) {
final long amountOver = transferred - maxRateValue;
// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
final double pct = (double) amountOver / (double) maxRateValue;
final long penalizationPeriod = (long) (milliDuration * pct);
this.penalizationExpired = now + penalizationPeriod;
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
}
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
if (transferred > maxRateValue) {
final long amountOver = transferred - maxRateValue;
// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
final double pct = (double) amountOver / (double) maxRateValue;
final long penalizationPeriod = (long) (milliDuration * pct);
this.penalizationExpired = now + penalizationPeriod;
logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[] {value, penalizationPeriod});
}
lastUpdateTime = now;
return true;
}
}
lastUpdateTime = now;
return true;
}
}
}