From ccfb57fe9ff43f11319dcb1625bfc78b1d88f56a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 7 Oct 2015 17:48:51 -0400 Subject: [PATCH] NIFI-810: Addressed several checkstyle violations --- .../annotation/behavior/InputRequirement.java | 70 ++- .../nifi/processors/aws/s3/PutS3Object.java | 46 +- .../apache/nifi/controller/ProcessorNode.java | 88 +-- .../controller/StandardProcessorNode.java | 10 +- .../standard/Base64EncodeContent.java | 168 ++--- .../nifi/processors/standard/ControlRate.java | 582 +++++++++--------- 6 files changed, 489 insertions(+), 475 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java index 97e6b88b8d..13f442c18e 100644 --- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.annotation.behavior; import java.lang.annotation.Documented; @@ -21,31 +37,31 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface InputRequirement { - Requirement value(); - - public static enum Requirement { - /** - * This value is used to indicate that the Processor requires input from other Processors - * in order to run. As a result, the Processor will not be valid if it does not have any - * incoming connections. - */ - INPUT_REQUIRED, - - /** - * This value is used to indicate that the Processor will consume data from an incoming - * connection but does not require an incoming connection in order to perform its task. - * If the {@link InputRequirement} annotation is not present, this is the default value - * that is used. - */ - INPUT_ALLOWED, - - /** - * This value is used to indicate that the Processor is a "Source Processor" and does - * not accept incoming connections. Because the Processor does not pull FlowFiles from - * an incoming connection, it can be very confusing for users who create incoming connections - * to the Processor. As a result, this value can be used in order to clarify that incoming - * connections will not be used. This prevents the user from even creating such a connection. - */ - INPUT_FORBIDDEN; - } + Requirement value(); + + public static enum Requirement { + /** + * This value is used to indicate that the Processor requires input from other Processors + * in order to run. As a result, the Processor will not be valid if it does not have any + * incoming connections. + */ + INPUT_REQUIRED, + + /** + * This value is used to indicate that the Processor will consume data from an incoming + * connection but does not require an incoming connection in order to perform its task. + * If the {@link InputRequirement} annotation is not present, this is the default value + * that is used. + */ + INPUT_ALLOWED, + + /** + * This value is used to indicate that the Processor is a "Source Processor" and does + * not accept incoming connections. Because the Processor does not pull FlowFiles from + * an incoming connection, it can be very confusing for users who create incoming connections + * to the Processor. As a result, this value can be used in order to clarify that incoming + * connections will not be used. This prevents the user from even creating such a connection. + */ + INPUT_FORBIDDEN; + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 7398c4eb63..c7212f5c67 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -59,10 +59,8 @@ import com.amazonaws.services.s3.model.StorageClass; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") -@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", - value = "The value of a User-Defined Metadata field to add to the S3 Object", - description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", - supportsExpressionLanguage = true) +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object", + description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true) @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") @WritesAttributes({ @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @@ -72,22 +70,22 @@ import com.amazonaws.services.s3.model.StorageClass; public class PutS3Object extends AbstractS3Processor { public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() - .name("Expiration Time Rule") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Expiration Time Rule") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() - .name("Storage Class") - .required(true) - .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) - .defaultValue(StorageClass.Standard.name()) - .build(); + .name("Storage Class") + .required(true) + .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) + .defaultValue(StorageClass.Standard.name()) + .build(); public static final List properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); @Override protected List getSupportedPropertyDescriptors() { @@ -97,15 +95,15 @@ public class PutS3Object extends AbstractS3Processor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { + public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); if (flowFile == null) { return; @@ -176,9 +174,9 @@ public class PutS3Object extends AbstractS3Processor { final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis); - getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis}); + getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); } catch (final ProcessException | AmazonClientException pe) { - getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe}); + getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe}); session.transfer(flowFile, REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 2f72d0f6d4..d340c77415 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -31,72 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy; public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { - public ProcessorNode(final Processor processor, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(processor, id, validationContextFactory, serviceProvider); - } + public ProcessorNode(final Processor processor, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + super(processor, id, validationContextFactory, serviceProvider); + } - public abstract boolean isIsolated(); + public abstract boolean isIsolated(); - public abstract boolean isTriggerWhenAnyDestinationAvailable(); + public abstract boolean isTriggerWhenAnyDestinationAvailable(); - @Override - public abstract boolean isSideEffectFree(); + @Override + public abstract boolean isSideEffectFree(); - public abstract boolean isTriggeredSerially(); + public abstract boolean isTriggeredSerially(); - public abstract boolean isEventDrivenSupported(); + public abstract boolean isEventDrivenSupported(); - public abstract boolean isHighThroughputSupported(); + public abstract boolean isHighThroughputSupported(); - public abstract Requirement getInputRequirement(); + public abstract Requirement getInputRequirement(); - @Override - public abstract boolean isValid(); + @Override + public abstract boolean isValid(); - public abstract void setScheduledState(ScheduledState scheduledState); + public abstract void setScheduledState(ScheduledState scheduledState); - public abstract void setBulletinLevel(LogLevel bulletinLevel); + public abstract void setBulletinLevel(LogLevel bulletinLevel); - public abstract LogLevel getBulletinLevel(); + public abstract LogLevel getBulletinLevel(); - public abstract Processor getProcessor(); + public abstract Processor getProcessor(); - public abstract void yield(long period, TimeUnit timeUnit); + public abstract void yield(long period, TimeUnit timeUnit); - public abstract void setAutoTerminatedRelationships(Set relationships); + public abstract void setAutoTerminatedRelationships(Set relationships); - public abstract Set getAutoTerminatedRelationships(); + public abstract Set getAutoTerminatedRelationships(); - public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); + public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); - @Override - public abstract SchedulingStrategy getSchedulingStrategy(); + @Override + public abstract SchedulingStrategy getSchedulingStrategy(); - public abstract void setRunDuration(long duration, TimeUnit timeUnit); + public abstract void setRunDuration(long duration, TimeUnit timeUnit); - public abstract long getRunDuration(TimeUnit timeUnit); + public abstract long getRunDuration(TimeUnit timeUnit); - public abstract Map getStyle(); + public abstract Map getStyle(); - public abstract void setStyle(Map style); + public abstract void setStyle(Map style); - /** - * @return the number of threads (concurrent tasks) currently being used by - * this Processor - */ - public abstract int getActiveThreadCount(); + /** + * @return the number of threads (concurrent tasks) currently being used by + * this Processor + */ + public abstract int getActiveThreadCount(); - /** - * Verifies that this Processor can be started if the provided set of - * services are enabled. This is introduced because we need to verify that - * all components can be started before starting any of them. In order to do - * that, we need to know that this component can be started if the given - * services are enabled, as we will then enable the given services before - * starting this component. - * - * @param ignoredReferences to ignore - */ - public abstract void verifyCanStart(Set ignoredReferences); + /** + * Verifies that this Processor can be started if the provided set of + * services are enabled. This is introduced because we need to verify that + * all components can be started before starting any of them. In order to do + * that, we need to know that this component can be started if the given + * services are enabled, as we will then enable the given services before + * starting this component. + * + * @param ignoredReferences to ignore + */ + public abstract void verifyCanStart(Set ignoredReferences); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index f69c510a09..ad22c6de37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1306,9 +1306,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void verifyModifiable() throws IllegalStateException { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - }>>>>>>>2215 bc848b7db395b2ca9ac7cc4dc10891393721 + public void verifyModifiable() throws IllegalStateException { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index 816b4079ed..db45109c0c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -56,99 +56,99 @@ import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) public class Base64EncodeContent extends AbstractProcessor { - public static final String ENCODE_MODE = "Encode"; - public static final String DECODE_MODE = "Decode"; + public static final String ENCODE_MODE = "Encode"; + public static final String DECODE_MODE = "Decode"; - public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() - .name("Mode") - .description("Specifies whether the content should be encoded or decoded") - .required(true) - .allowableValues(ENCODE_MODE, DECODE_MODE) - .defaultValue(ENCODE_MODE) - .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully encoded or decoded will be routed to success") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") - .build(); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encoded or decoded") + .required(true) + .allowableValues(ENCODE_MODE, DECODE_MODE) + .defaultValue(ENCODE_MODE) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully encoded or decoded will be routed to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") + .build(); - private List properties; - private Set relationships; + private List properties; + private Set relationships; - @Override - protected void init(final ProcessorInitializationContext context) { - final List properties = new ArrayList<>(); - properties.add(MODE); - this.properties = Collections.unmodifiableList(properties); + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(MODE); + this.properties = Collections.unmodifiableList(properties); - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } - @Override - public Set getRelationships() { - return relationships; - } + @Override + public Set getRelationships() { + return relationships; + } - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } - final ProcessorLog logger = getLogger(); + final ProcessorLog logger = getLogger(); - boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); - try { - final StopWatch stopWatch = new StopWatch(true); - if (encode) { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - try (Base64OutputStream bos = new Base64OutputStream(out)) { - int len = -1; - byte[] buf = new byte[8192]; - while ((len = in.read(buf)) > 0) { - bos.write(buf, 0, len); - } - bos.flush(); - } - } - }); - } else { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { - int len = -1; - byte[] buf = new byte[8192]; - while ((len = bis.read(buf)) > 0) { - out.write(buf, 0, len); - } - out.flush(); - } - } - }); - } + boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); + try { + final StopWatch stopWatch = new StopWatch(true); + if (encode) { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64OutputStream bos = new Base64OutputStream(out)) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = in.read(buf)) > 0) { + bos.write(buf, 0, len); + } + bos.flush(); + } + } + }); + } else { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = bis.read(buf)) > 0) { + out.write(buf, 0, len); + } + out.flush(); + } + } + }); + } - logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } catch (ProcessException e) { - logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - } - } + logger.info("Successfully {} {}", new Object[] {encode ? "encoded" : "decoded", flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (ProcessException e) { + logger.error("Failed to {} {} due to {}", new Object[] {encode ? "encode" : "decode", flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index a45c211281..0847472bae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -61,340 +61,340 @@ import org.apache.nifi.util.timebuffer.TimedBuffer; @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.") public class ControlRate extends AbstractProcessor { - public static final String DATA_RATE = "data rate"; - public static final String FLOWFILE_RATE = "flowfile count"; - public static final String ATTRIBUTE_RATE = "attribute value"; + public static final String DATA_RATE = "data rate"; + public static final String FLOWFILE_RATE = "flowfile count"; + public static final String ATTRIBUTE_RATE = "attribute value"; - public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() - .name("Rate Control Criteria") - .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") - .required(true) - .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE) - .defaultValue(DATA_RATE) - .build(); - public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() - .name("Maximum Rate") - .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " - + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria - .build(); - public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() - .name("Rate Controlled Attribute") - .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " - + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. " - + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() - .name("Time Duration") - .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.") - .required(true) - .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) - .defaultValue("1 min") - .build(); - public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() - .name("Grouping Attribute") - .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for " - + "each value specified by the attribute with this name. Changing this value resets the rate counters.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() + .name("Rate Control Criteria") + .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") + .required(true) + .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE) + .defaultValue(DATA_RATE) + .build(); + public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() + .name("Maximum Rate") + .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " + + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria + .build(); + public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Rate Controlled Attribute") + .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " + + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. " + + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() + .name("Time Duration") + .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.") + .required(true) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("1 min") + .build(); + public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Grouping Attribute") + .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for " + + "each value specified by the attribute with this name. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles are transferred to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format") - .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format") + .build(); - private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); - private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; + private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); + private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; - private final ConcurrentMap throttleMap = new ConcurrentHashMap<>(); - private List properties; - private Set relationships; - private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); + private final ConcurrentMap throttleMap = new ConcurrentHashMap<>(); + private List properties; + private Set relationships; + private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); - @Override - protected void init(final ProcessorInitializationContext context) { - final List properties = new ArrayList<>(); - properties.add(RATE_CONTROL_CRITERIA); - properties.add(MAX_RATE); - properties.add(RATE_CONTROL_ATTRIBUTE_NAME); - properties.add(TIME_PERIOD); - properties.add(GROUPING_ATTRIBUTE_NAME); - this.properties = Collections.unmodifiableList(properties); + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(RATE_CONTROL_CRITERIA); + properties.add(MAX_RATE); + properties.add(RATE_CONTROL_ATTRIBUTE_NAME); + properties.add(TIME_PERIOD); + properties.add(GROUPING_ATTRIBUTE_NAME); + this.properties = Collections.unmodifiableList(properties); - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - } + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } - @Override - public Set getRelationships() { - return relationships; - } + @Override + public Set getRelationships() { + return relationships; + } - @Override - protected Collection customValidate(final ValidationContext context) { - final List validationResults = new ArrayList<>(super.customValidate(context)); + @Override + protected Collection customValidate(final ValidationContext context) { + final List validationResults = new ArrayList<>(super.customValidate(context)); - final Validator rateValidator; - switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { - case DATA_RATE: - rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; - break; - case ATTRIBUTE_RATE: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; - final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); - if (rateAttr == null) { - validationResults.add(new ValidationResult.Builder() - .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()) - .explanation(" property must be set if using of 'attribute value'") - .build()); - } - break; - case FLOWFILE_RATE: - default: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; - break; - } + final Validator rateValidator; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; + break; + case ATTRIBUTE_RATE: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + if (rateAttr == null) { + validationResults.add(new ValidationResult.Builder() + .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()) + .explanation(" property must be set if using of 'attribute value'") + .build()); + } + break; + case FLOWFILE_RATE: + default: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + break; + } - final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context); - if (!rateResult.isValid()) { - validationResults.add(rateResult); - } + final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context); + if (!rateResult.isValid()) { + validationResults.add(rateResult); + } - return validationResults; - } + return validationResults; + } - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - super.onPropertyModified(descriptor, oldValue, newValue); + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); - if (descriptor.equals(RATE_CONTROL_CRITERIA) - || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) - || descriptor.equals(GROUPING_ATTRIBUTE_NAME) - || descriptor.equals(TIME_PERIOD)) { - // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. - throttleMap.clear(); - } else if (descriptor.equals(MAX_RATE)) { - final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) { - newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(newValue); - } + if (descriptor.equals(RATE_CONTROL_CRITERIA) + || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) + || descriptor.equals(GROUPING_ATTRIBUTE_NAME) + || descriptor.equals(TIME_PERIOD)) { + // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. + throttleMap.clear(); + } else if (descriptor.equals(MAX_RATE)) { + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) { + newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(newValue); + } - for (final Throttle throttle : throttleMap.values()) { - throttle.setMaxRate(newRate); - } - } - } + for (final Throttle throttle : throttleMap.values()) { + throttle.setMaxRate(newRate); + } + } + } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final long lastClearTime = lastThrottleClearTime.get(); - final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); - if (lastClearTime < throttleExpirationMillis) { - if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { - final Iterator> itr = throttleMap.entrySet().iterator(); - while (itr.hasNext()) { - final Map.Entry entry = itr.next(); - final Throttle throttle = entry.getValue(); - if (throttle.tryLock()) { - try { - if (throttle.lastUpdateTime() < lastClearTime) { - itr.remove(); - } - } finally { - throttle.unlock(); - } - } - } - } - } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final long lastClearTime = lastThrottleClearTime.get(); + final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); + if (lastClearTime < throttleExpirationMillis) { + if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { + final Iterator> itr = throttleMap.entrySet().iterator(); + while (itr.hasNext()) { + final Map.Entry entry = itr.next(); + final Throttle throttle = entry.getValue(); + if (throttle.tryLock()) { + try { + if (throttle.lastUpdateTime() < lastClearTime) { + itr.remove(); + } + } finally { + throttle.unlock(); + } + } + } + } + } - // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } + // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } - final ProcessorLog logger = getLogger(); - final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS); - final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); - long rateValue; - switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { - case DATA_RATE: - rateValue = flowFile.getSize(); - break; - case FLOWFILE_RATE: - rateValue = 1; - break; - case ATTRIBUTE_RATE: - final String attributeValue = flowFile.getAttribute(rateControlAttributeName); - if (attributeValue == null) { - logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName}); - session.transfer(flowFile, REL_FAILURE); - return; - } + final ProcessorLog logger = getLogger(); + final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS); + final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + long rateValue; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValue = flowFile.getSize(); + break; + case FLOWFILE_RATE: + rateValue = 1; + break; + case ATTRIBUTE_RATE: + final String attributeValue = flowFile.getAttribute(rateControlAttributeName); + if (attributeValue == null) { + logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[] {flowFile, rateControlAttributeName}); + session.transfer(flowFile, REL_FAILURE); + return; + } - if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { - logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long", - new Object[]{flowFile, rateControlAttributeName, attributeValue}); - session.transfer(flowFile, REL_FAILURE); - return; - } - rateValue = Long.parseLong(attributeValue); - break; - default: - throw new AssertionError(" property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue()); - } + if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { + logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long", + new Object[] {flowFile, rateControlAttributeName, attributeValue}); + session.transfer(flowFile, REL_FAILURE); + return; + } + rateValue = Long.parseLong(attributeValue); + break; + default: + throw new AssertionError(" property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue()); + } - final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); - final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); - Throttle throttle = throttleMap.get(groupName); - if (throttle == null) { - throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger); + final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); + final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); + Throttle throttle = throttleMap.get(groupName); + if (throttle == null) { + throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger); - final String maxRateValue = context.getProperty(MAX_RATE).getValue(); - final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) { - newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(maxRateValue); - } - throttle.setMaxRate(newRate); + final String maxRateValue = context.getProperty(MAX_RATE).getValue(); + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) { + newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(maxRateValue); + } + throttle.setMaxRate(newRate); - throttleMap.put(groupName, throttle); - } + throttleMap.put(groupName, throttle); + } - throttle.lock(); - try { - if (throttle.tryAdd(rateValue)) { - logger.info("transferring {} to 'success'", new Object[]{flowFile}); - session.transfer(flowFile, REL_SUCCESS); - } else { - flowFile = session.penalize(flowFile); - session.transfer(flowFile); - } - } finally { - throttle.unlock(); - } - } + throttle.lock(); + try { + if (throttle.tryAdd(rateValue)) { + logger.info("transferring {} to 'success'", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } else { + flowFile = session.penalize(flowFile); + session.transfer(flowFile); + } + } finally { + throttle.unlock(); + } + } - private static class TimestampedLong { + private static class TimestampedLong { - private final Long value; - private final long timestamp = System.currentTimeMillis(); + private final Long value; + private final long timestamp = System.currentTimeMillis(); - public TimestampedLong(final Long value) { - this.value = value; - } + public TimestampedLong(final Long value) { + this.value = value; + } - public Long getValue() { - return value; - } + public Long getValue() { + return value; + } - public long getTimestamp() { - return timestamp; - } - } + public long getTimestamp() { + return timestamp; + } + } - private static class RateEntityAccess implements EntityAccess { + private static class RateEntityAccess implements EntityAccess { - @Override - public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { - if (oldValue == null && toAdd == null) { - return new TimestampedLong(0L); - } else if (oldValue == null) { - return toAdd; - } else if (toAdd == null) { - return oldValue; - } + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } - return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); - } + return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); + } - @Override - public TimestampedLong createNew() { - return new TimestampedLong(0L); - } + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } - @Override - public long getTimestamp(TimestampedLong entity) { - return entity == null ? 0L : entity.getTimestamp(); - } - } + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } + } - private static class Throttle extends ReentrantLock { + private static class Throttle extends ReentrantLock { - private final AtomicLong maxRate = new AtomicLong(1L); - private final long timePeriodValue; - private final TimeUnit timePeriodUnit; - private final TimedBuffer timedBuffer; - private final ProcessorLog logger; + private final AtomicLong maxRate = new AtomicLong(1L); + private final long timePeriodValue; + private final TimeUnit timePeriodUnit; + private final TimedBuffer timedBuffer; + private final ProcessorLog logger; - private volatile long penalizationExpired; - private volatile long lastUpdateTime; + private volatile long penalizationExpired; + private volatile long lastUpdateTime; - public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) { - this.timePeriodUnit = unit; - this.timePeriodValue = timePeriod; - this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess()); - this.logger = logger; - } + public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) { + this.timePeriodUnit = unit; + this.timePeriodValue = timePeriod; + this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess()); + this.logger = logger; + } - public void setMaxRate(final long maxRate) { - this.maxRate.set(maxRate); - } + public void setMaxRate(final long maxRate) { + this.maxRate.set(maxRate); + } - public long lastUpdateTime() { - return lastUpdateTime; - } + public long lastUpdateTime() { + return lastUpdateTime; + } - public boolean tryAdd(final long value) { - final long now = System.currentTimeMillis(); - if (penalizationExpired > now) { - return false; - } + public boolean tryAdd(final long value) { + final long now = System.currentTimeMillis(); + if (penalizationExpired > now) { + return false; + } - final long maxRateValue = maxRate.get(); + final long maxRateValue = maxRate.get(); - final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit)); - if (sum != null && sum.getValue() >= maxRateValue) { - logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value}); - return false; - } + final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit)); + if (sum != null && sum.getValue() >= maxRateValue) { + logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[] {sum.getValue(), value}); + return false; + } - logger.debug("current sum for throttle is {}, so allowing rate of {} through", - new Object[]{sum == null ? 0 : sum.getValue(), value}); + logger.debug("current sum for throttle is {}, so allowing rate of {} through", + new Object[] {sum == null ? 0 : sum.getValue(), value}); - final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); - if (transferred > maxRateValue) { - final long amountOver = transferred - maxRateValue; - // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long - final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit); - final double pct = (double) amountOver / (double) maxRateValue; - final long penalizationPeriod = (long) (milliDuration * pct); - this.penalizationExpired = now + penalizationPeriod; - logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod}); - } + final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); + if (transferred > maxRateValue) { + final long amountOver = transferred - maxRateValue; + // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long + final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit); + final double pct = (double) amountOver / (double) maxRateValue; + final long penalizationPeriod = (long) (milliDuration * pct); + this.penalizationExpired = now + penalizationPeriod; + logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[] {value, penalizationPeriod}); + } - lastUpdateTime = now; - return true; - } - } + lastUpdateTime = now; + return true; + } + } }