NIFI-12457: Added additional use case based documentation and fixed some typos/made some clarifications in related docs

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8108.
This commit is contained in:
Mark Payne 2023-12-03 16:07:56 -05:00 committed by Pierre Villard
parent 4f26b93615
commit 283abadf97
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 111 additions and 17 deletions

View File

@ -129,6 +129,63 @@ import java.util.concurrent.TimeUnit;
) )
} }
) )
@MultiProcessorUseCase(
description = "Retrieve only files from S3 that meet some specified criteria",
keywords = {"s3", "state", "retrieve", "filter", "select", "fetch", "criteria"},
configurations = {
@ProcessorConfiguration(
processorClass = ListS3.class,
configuration = """
The "Bucket" property should be set to the name of the S3 bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{S3_SOURCE_BUCKET}`.
The "Region" property must be set to denote the S3 region that the Bucket resides in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{S3_SOURCE_REGION}`.
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
The 'success' Relationship of this Processor is then connected to RouteOnAttribute.
"""
),
@ProcessorConfiguration(
processorClassName = "org.apache.nifi.processors.standard.RouteOnAttribute",
configuration = """
If you would like to "OR" together all of the conditions (i.e., the file should be retrieved if any of the conditions are met), \
set "Routing Strategy" to "Route to 'matched' if any matches".
If you would like to "AND" together all of the conditions (i.e., the file should only be retrieved if all of the conditions are met), \
set "Routing Strategy" to "Route to 'matched' if all match".
For each condition that you would like to filter on, add a new property. The name of the property should describe the condition. \
The value of the property should be an Expression Language expression that returns `true` if the file meets the condition or `false` \
if the file does not meet the condition.
Some attributes that you may consider filtering on are:
- `filename` (the name of the file)
- `s3.length` (the number of bytes in the file)
- `s3.tag.<tag name>` (the value of the s3 tag with the name `tag name`)
- `s3.user.metadata.<key name>` (the value of the user metadata with the key named `key name`)
For example, to fetch only files that are at least 1 MB and have a filename ending in `.zip` we would set the following properties:
- "Routing Strategy" = "Route to 'matched' if all match"
- "At least 1 MB" = "${s3.length:ge(1000000)}"
- "Ends in .zip" = "${filename:endsWith('.zip')}"
Auto-terminate the `unmatched` Relationship.
Connect the `matched` Relationship to the FetchS3Object processor.
"""
),
@ProcessorConfiguration(
processorClass = FetchS3Object.class,
configuration = """
"Bucket" = "${s3.bucket}"
"Object Key" = "${filename}"
The "AWS Credentials Provider service" property should specify an instance of the AWSCredentialsProviderControllerService in order to provide credentials for accessing the bucket.
The "Region" property must be set to the same value as the "Region" property of the ListS3 Processor.
"""
)
}
)
@MultiProcessorUseCase( @MultiProcessorUseCase(
description = "Retrieve new files as they arrive in an S3 bucket", description = "Retrieve new files as they arrive in an S3 bucket",
notes = "This method of retrieving files from S3 is more efficient than using ListS3 and more cost effective. It is the pattern recommended by AWS. " + notes = "This method of retrieving files from S3 is more efficient than using ListS3 and more cost effective. It is the pattern recommended by AWS. " +

View File

@ -117,7 +117,7 @@ import java.util.stream.Stream;
Choose a RecordWriter that writes the data in the desired output format. Choose a RecordWriter that writes the data in the desired output format.
Add a single additional property. The name of the property should describe the criteria. \ Add a single additional property. The name of the property should describe the criteria. \
The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false otherwise. The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false` otherwise.
For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named \ For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named \
`largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. \ `largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. \

View File

@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -67,20 +69,21 @@ import java.util.concurrent.atomic.AtomicReference;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language", "find", "text", "string", "search", "filter", "detect"}) @Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language", "find", "text", "string", "search", "filter", "detect"})
@CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language") @CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language")
@DynamicProperty(name = "Relationship Name", value = "Attribute Expression Language", @DynamicProperty(name = "Relationship Name", value = "Expression Language expression that returns a boolean value indicating whether or not the FlowFile should be routed to this Relationship",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Routes FlowFiles whose attributes match the " expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ "Attribute Expression Language specified in the Dynamic Property Value to the Relationship specified in the Dynamic Property Key") description = "Routes FlowFiles whose attributes match the Expression Language specified in the Dynamic Property Value to the Relationship " +
"specified in the Dynamic Property Key")
@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Attribute Expression Language") @DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Attribute Expression Language")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = RouteOnAttribute.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed") @WritesAttribute(attribute = RouteOnAttribute.ROUTE_ATTRIBUTE_KEY, description = "The relation to which the FlowFile was routed")
}) })
@UseCase( @UseCase(
description = "Route data to one or more relationships based on its attributes.", description = "Route data to one or more relationships based on its attributes using the NiFi Expression Language.",
keywords = {"attributes", "routing", "expression language"}, keywords = {"attributes", "routing", "expression language"},
configuration = """ configuration = """
Set the "Routing Strategy" property to "Route to Property name". Set the "Routing Strategy" property to "Route to Property name".
For each route that a FlowFile might be routed to, add a new property. The name of the property should describe the route. For each route that a FlowFile might be routed to, add a new property. The name of the property should describe the route.
The value of the property is an Attribute Expression Language expression that will be used to determine whether or not a given FlowFile will be routed to the \ The value of the property is an Attribute Expression Language expression that returns a boolean value indicating whether or not a given FlowFile will be routed to the \
associated relationship. associated relationship.
For example, we might route data based on its file extension using the following properties: For example, we might route data based on its file extension using the following properties:
@ -133,6 +136,39 @@ import java.util.concurrent.atomic.AtomicReference;
Connect the 'unmatched' relationship to the next processor in the flow. Connect the 'unmatched' relationship to the next processor in the flow.
""" """
) )
@MultiProcessorUseCase(
description = "Route record-oriented data based on whether or not the record's values meet some criteria",
keywords = {"record", "route", "content", "data"},
configurations = {
@ProcessorConfiguration(
processorClass = PartitionRecord.class,
configuration = """
Choose a RecordReader that is appropriate based on the format of the incoming data.
Choose a RecordWriter that writes the data in the desired output format.
Add a single additional property. The name of the property should describe the criteria to route on. \
The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false` otherwise. \
This adds a new attribute to the FlowFile whose name is equal to the property name.
Connect the 'success' Relationship to RouteOnAttribute.
"""
),
@ProcessorConfiguration(
processorClass = RouteOnAttribute.class,
configuration = """
Set "Routing Strategy" to "Route to Property name"
Add two additional properties. For the first one, the name of the property should describe data that matches the criteria. \
The value is an Expression Language expression that checks if the attribute added by the PartitionRecord processor has a value of `true`. \
For example, `${criteria:equals('true')}`.
The second property should have a name that describes data that does not match the criteria. The value is an Expression Language that evaluates to the \
opposite of the first property value. For example, `${criteria:equals('true'):not()}`.
Connect each of the newly created Relationships to the appropriate downstream processors.
"""
)
}
)
public class RouteOnAttribute extends AbstractProcessor { public class RouteOnAttribute extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route"; public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route";

View File

@ -60,22 +60,21 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@SideEffectFree @SideEffectFree
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"update", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) @Tags({"update", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
@CapabilityDescription("Updates the contents of a FlowFile that contains Record-oriented data (i.e., data that can be read via a RecordReader and written by a RecordWriter). " @CapabilityDescription("Updates the contents of a FlowFile that contains Record-oriented data (i.e., data that can be read via a RecordReader and written by a RecordWriter). "
+ "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should "
+ "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from "
+ "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.") + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression."), @WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression."),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
}) })
@DynamicProperty(name = "A RecordPath.", value = "The value to use to replace fields in the record that match the RecordPath", @DynamicProperty(name = "A RecordPath.", value = "The value to use to replace fields in the record that match the RecordPath",
description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.", description = "Allows users to specify values to use to replace fields in the record that match the RecordPath.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SeeAlso({ConvertRecord.class}) @SeeAlso({ConvertRecord.class})
@UseCase( @UseCase(
description = "Combine multiple fields into a single field.", description = "Combine multiple fields into a single field.",
@ -124,8 +123,9 @@ import java.util.stream.Stream;
""" """
) )
@UseCase( @UseCase(
description = "Change the format of a record field.", description = "Change the format of a record field's value.",
keywords = {"change", "update", "replace", "insert", "transform", "format", "date/time", "timezone", "expression language"}, keywords = {"change", "update", "replace", "insert", "transform", "format", "date/time", "timezone", "expression language"},
notes = "Use the RenameRecordField Processor in order to change a field's name.",
configuration = """ configuration = """
"Replacement Value Strategy" = "Literal Value" "Replacement Value Strategy" = "Literal Value"
@ -148,11 +148,11 @@ public class UpdateRecord extends AbstractRecordProcessor {
static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value", static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value",
"The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language " "The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language "
+ "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated."); + "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated.");
static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value", static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value",
"The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path " "The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path "
+ "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, " + "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, "
+ "and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship."); + "and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship.");
static final PropertyDescriptor REPLACEMENT_VALUE_STRATEGY = new PropertyDescriptor.Builder() static final PropertyDescriptor REPLACEMENT_VALUE_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Value Strategy") .name("Replacement Value Strategy")
@ -357,4 +357,5 @@ public class UpdateRecord extends AbstractRecordProcessor {
return selectedFields.get(0); return selectedFields.get(0);
} }
} }
} }