diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 7a99ed3cfa..c0cae164b8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -129,6 +129,63 @@ import java.util.concurrent.TimeUnit; ) } ) +@MultiProcessorUseCase( + description = "Retrieve only files from S3 that meet some specified criteria", + keywords = {"s3", "state", "retrieve", "filter", "select", "fetch", "criteria"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListS3.class, + configuration = """ + The "Bucket" property should be set to the name of the S3 bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{S3_SOURCE_BUCKET}`. + The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{S3_SOURCE_REGION}`. + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + + The 'success' Relationship of this Processor is then connected to RouteOnAttribute. + """ + ), + @ProcessorConfiguration( + processorClassName = "org.apache.nifi.processors.standard.RouteOnAttribute", + configuration = """ + If you would like to "OR" together all of the conditions (i.e., the file should be retrieved if any of the conditions are met), \ + set "Routing Strategy" to "Route to 'matched' if any matches". + If you would like to "AND" together all of the conditions (i.e., the file should only be retrieved if all of the conditions are met), \ + set "Routing Strategy" to "Route to 'matched' if all match". + + For each condition that you would like to filter on, add a new property. The name of the property should describe the condition. \ + The value of the property should be an Expression Language expression that returns `true` if the file meets the condition or `false` \ + if the file does not meet the condition. + + Some attributes that you may consider filtering on are: + - `filename` (the name of the file) + - `s3.length` (the number of bytes in the file) + - `s3.tag.` (the value of the s3 tag with the name `tag name`) + - `s3.user.metadata.` (the value of the user metadata with the key named `key name`) + + For example, to fetch only files that are at least 1 MB and have a filename ending in `.zip` we would set the following properties: + - "Routing Strategy" = "Route to 'matched' if all match" + - "At least 1 MB" = "${s3.length:ge(1000000)}" + - "Ends in .zip" = "${filename:endsWith('.zip')}" + + Auto-terminate the `unmatched` Relationship. + Connect the `matched` Relationship to the FetchS3Object processor. + """ + ), + @ProcessorConfiguration( + processorClass = FetchS3Object.class, + configuration = """ + "Bucket" = "${s3.bucket}" + "Object Key" = "${filename}" + + The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket. + + The "Region" property must be set to the same value as the "Region" property of the ListS3 Processor. + """ + ) + } +) @MultiProcessorUseCase( description = "Retrieve new files as they arrive in an S3 bucket", notes = "This method of retrieving files from S3 is more efficient than using ListS3 and more cost effective. It is the pattern recommended by AWS. " + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index a7f31165d8..85755887cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -117,7 +117,7 @@ import java.util.stream.Stream; Choose a RecordWriter that writes the data in the desired output format. Add a single additional property. The name of the property should describe the criteria. \ - The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false otherwise. + The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false` otherwise. For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named \ `largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. \ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java index f36e5f85c7..2b2f4c931d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -67,20 +69,21 @@ import java.util.concurrent.atomic.AtomicReference; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language", "find", "text", "string", "search", "filter", "detect"}) @CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language") -@DynamicProperty(name = "Relationship Name", value = "Attribute Expression Language", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Routes FlowFiles whose attributes match the " - + "Attribute Expression Language specified in the Dynamic Property Value to the Relationship specified in the Dynamic Property Key") +@DynamicProperty(name = "Relationship Name", value = "Expression Language expression that returns a boolean value indicating whether or not the FlowFile should be routed to this Relationship", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Routes FlowFiles whose attributes match the Expression Language specified in the Dynamic Property Value to the Relationship " + + "specified in the Dynamic Property Key") @DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Attribute Expression Language") @WritesAttributes({ @WritesAttribute(attribute = RouteOnAttribute.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed") }) @UseCase( - description = "Route data to one or more relationships based on its attributes.", + description = "Route data to one or more relationships based on its attributes using the NiFi Expression Language.", keywords = {"attributes", "routing", "expression language"}, configuration = """ Set the "Routing Strategy" property to "Route to Property name". For each route that a FlowFile might be routed to, add a new property. The name of the property should describe the route. - The value of the property is an Attribute Expression Language expression that will be used to determine whether or not a given FlowFile will be routed to the \ + The value of the property is an Attribute Expression Language expression that returns a boolean value indicating whether or not a given FlowFile will be routed to the \ associated relationship. For example, we might route data based on its file extension using the following properties: @@ -133,6 +136,39 @@ import java.util.concurrent.atomic.AtomicReference; Connect the 'unmatched' relationship to the next processor in the flow. """ ) +@MultiProcessorUseCase( + description = "Route record-oriented data based on whether or not the record's values meet some criteria", + keywords = {"record", "route", "content", "data"}, + configurations = { + @ProcessorConfiguration( + processorClass = PartitionRecord.class, + configuration = """ + Choose a RecordReader that is appropriate based on the format of the incoming data. + Choose a RecordWriter that writes the data in the desired output format. + + Add a single additional property. The name of the property should describe the criteria to route on. \ + The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false` otherwise. \ + This adds a new attribute to the FlowFile whose name is equal to the property name. + + Connect the 'success' Relationship to RouteOnAttribute. + """ + ), + @ProcessorConfiguration( + processorClass = RouteOnAttribute.class, + configuration = """ + Set "Routing Strategy" to "Route to Property name" + + Add two additional properties. For the first one, the name of the property should describe data that matches the criteria. \ + The value is an Expression Language expression that checks if the attribute added by the PartitionRecord processor has a value of `true`. \ + For example, `${criteria:equals('true')}`. + The second property should have a name that describes data that does not match the criteria. The value is an Expression Language that evaluates to the \ + opposite of the first property value. For example, `${criteria:equals('true'):not()}`. + + Connect each of the newly created Relationships to the appropriate downstream processors. + """ + ) + } +) public class RouteOnAttribute extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 73e448f788..42a020b273 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -60,22 +60,21 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; - @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"update", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) @CapabilityDescription("Updates the contents of a FlowFile that contains Record-oriented data (i.e., data that can be read via a RecordReader and written by a RecordWriter). " - + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " - + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " - + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the Property.") + + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " + + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " + + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the Property.") @WritesAttributes({ @WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression."), @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") }) @DynamicProperty(name = "A RecordPath.", value = "The value to use to replace fields in the record that match the RecordPath", - description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @SeeAlso({ConvertRecord.class}) @UseCase( description = "Combine multiple fields into a single field.", @@ -124,8 +123,9 @@ import java.util.stream.Stream; """ ) @UseCase( - description = "Change the format of a record field.", + description = "Change the format of a record field's value.", keywords = {"change", "update", "replace", "insert", "transform", "format", "date/time", "timezone", "expression language"}, + notes = "Use the RenameRecordField Processor in order to change a field's name.", configuration = """ "Replacement Value Strategy" = "Literal Value" @@ -148,11 +148,11 @@ public class UpdateRecord extends AbstractRecordProcessor { static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value", "The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language " - + "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated."); + + "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated."); static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value", "The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path " - + "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, " - + "and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship."); + + "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, " + + "and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship."); static final PropertyDescriptor REPLACEMENT_VALUE_STRATEGY = new PropertyDescriptor.Builder() .name("Replacement Value Strategy") @@ -357,4 +357,5 @@ public class UpdateRecord extends AbstractRecordProcessor { return selectedFields.get(0); } } -} \ No newline at end of file +} +