NIFI-12291: Added additional Use Case Docs

This closes #7954

Signed-off-by: David Handermann <>
This commit is contained in:
Mark Payne 2023-10-30 14:15:16 -04:00 committed by exceptionfactory
parent 5c98b7cce0
commit 960498f76b
No known key found for this signature in database
5 changed files with 143 additions and 15 deletions

View File

@ -27,19 +27,9 @@ import com.slack.api.bolt.context.builtin.SlashCommandContext;
import com.slack.api.bolt.request.builtin.SlashCommandRequest;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.socket_mode.SocketModeApp;
import com.slack.api.model.event.FileSharedEvent;
import com.slack.api.model.event.MessageEvent;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import com.slack.api.model.event.MessageFileShareEvent;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -61,6 +51,19 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@DefaultSettings(yieldDuration = "250 millis")
@ -149,6 +152,8 @@ public class ListenSlack extends AbstractProcessor {
// Register the event types that make sense
if (context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MESSAGE_EVENTS.getValue())) {
slackApp.event(MessageEvent.class, this::handleEvent);
slackApp.event(MessageFileShareEvent.class, this::handleEvent);
slackApp.event(FileSharedEvent.class, this::handleEvent);
} else {
slackApp.command(Pattern.compile(".*"), this::handleCommand);

View File

@ -26,8 +26,11 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
@ -97,6 +100,69 @@ import;
@WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
description = "Combine together many arbitrary Records in order to create a single, larger file",
configuration = """
Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
Set "Merge Strategy" to `Bin-Packing Algorithm`.
Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of `1 MB` will result in not merging data until at least
1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of `0 B`.
Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value
to `10000` ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first).
Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will
merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value.
A reasonable default might be `10 mins` if there is no other latency requirement.
Connect the 'merged' Relationship to the next component in the flow. Auto-terminate the 'original' Relationship.
description = "Combine together many Records that have the same value for a particular field in the data, in order to create a single, larger file",
keywords = {"merge", "combine", "aggregate", "like records", "similar data"},
configurations = {
processorClass = PartitionRecord.class,
configuration = """
Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
Add a single additional property. The name of the property should describe the field on which the data is being merged together.
The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to.
For example, to merge together data that has the same value for the "productSku" field, add a property named `productSku` with a value of `/productSku`.
Connect the "success" Relationship to MergeRecord.
Auto-terminate the "original" Relationship.
processorClass = MergeRecord.class,
configuration = """
Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type.
Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type.
Set "Merge Strategy" to `Bin-Packing Algorithm`.
Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of `1 MB` will result in not merging data until at least
1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of `0 B`.
Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value
to `10000` ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first).
Set the "Maximum Number of Records" property to a value at least as large as the "Minimum Number of Records." If there is no need to limit the maximum number of
records per file, this number can be set to a value that will never be reached such as `1000000000`.
Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will
merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value.
A reasonable default might be `10 mins` if there is no other latency requirement.
Set the value of the "Correlation Attribute Name" property to the name of the property that you added in the PartitionRecord Processor. For example, if merging data
based on the "productSku" field, the property in PartitionRecord was named `productSku` so the value of the "Correlation Attribute Name" property should
be `productSku`.
Set the "Maximum Number of Bins" property to a value that is at least as large as the different number of values that will be present for the Correlation Attribute.
For example, if you expect 1,000 different SKUs, set this value to at least `1001`. It is not advisable, though, to set the value above 10,000.
Connect the 'merged' Relationship to the next component in the flow.
Auto-terminate the 'original' Relationship.
public class MergeRecord extends AbstractSessionFactoryProcessor {
// attributes for defragmentation
public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();

View File

@ -89,7 +89,7 @@ import java.util.Set;
"Listening Port" = a unique port number.
ListenHTTP automatically unpacks files that have attribute mime.type=application/flowfile-v3.
If PackageFlowFile batches 99 FlowFiles into 1 file that InvokeHTTP sends, then the original 99 FlowFiles will be output by ListenHTTP!
If PackageFlowFile batches 99 FlowFiles into 1 file that InvokeHTTP sends, then the original 99 FlowFiles will be output by ListenHTTP.
@ -112,7 +112,7 @@ import java.util.Set;
"Packaging Format" = "application/flowfile-v3".
Connect the output of a NiFi ingress processor that reads files stored offline to the input of UnpackContent.
If PackageFlowFile batches 99 FlowFiles into 1 file that is read from storage, then the original 99 FlowFiles will be output by UnpackContent!
If PackageFlowFile batches 99 FlowFiles into 1 file that is read from storage, then the original 99 FlowFiles will be output by UnpackContent.

View File

@ -28,6 +28,7 @@ import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -46,9 +47,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
@ -91,6 +92,24 @@ import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
description = "Retrieve all rows from a database table.",
keywords = {"jdbc", "rdbms", "cdc", "database", "table", "stream"},
configuration = """
Configure the "Database Connection Pooling Service" to specify a Connection Pooling Service so that the Processor knows how to connect to the database.
Set the "Database Type" property to the type of database to query, or "Generic" if the database vendor is not listed.
Set the "Table Name" property to the name of the table to retrieve records from.
Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output format.
Set the "Maximum-value Columns" property to a comma-separated list of columns whose values can be used to determine which values are new. For example, this might be set to
an `id` column that is a one-up number, or a `last_modified` column that is a timestamp of when the row was last modified.
Set the "Initial Load Strategy" property to "Start at Beginning".
Set the "Fetch Size" to a number that avoids loading too much data into memory on the NiFi side. For example, a value of `1000` will load up to 1,000 rows of data.
Set the "Max Rows Per Flow File" to a value that allows efficient processing, such as `1000` or `10000`.
Set the "Output Batch Size" property to a value greater than `0`. A smaller value, such as `1` or even `20` will result in lower latency but also slightly lower throughput.
A larger value such as `1000` will result in higher throughput but also higher latency. It is not recommended to set the value larger than `1000` as it can cause significant
memory utilization.
public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -85,6 +86,43 @@ import java.util.regex.Pattern;
description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details")
@Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.")
description = "Add a new FlowFile attribute",
configuration = """
Leave "Delete Attributes Expression" and "Stateful Variables Initial Value" unset.
Set "Store State" to "Do not store state".
Add a new property. The name of the property will become the name of the newly added attribute.
The value of the property will become the value of the newly added attribute. The value may use the NiFi Expression Language in order to reference other
attributes or call Expression Language functions.
description = "Overwrite a FlowFile attribute with a new value",
configuration = """
Leave "Delete Attributes Expression" and "Stateful Variables Initial Value" unset.
Set "Store State" to "Do not store state".
Add a new property. The name of the property will become the name of the attribute whose value will be overwritten.
The value of the property will become the new value of the attribute. The value may use the NiFi Expression Language in order to reference other
attributes or call Expression Language functions.
For example, to change the `txId` attribute to the uppercase version of its current value, add a property named `txId` with a value of `${txId:toUpper()}`
description = "Rename a file",
configuration = """
Leave "Delete Attributes Expression" and "Stateful Variables Initial Value" unset.
Set "Store State" to "Do not store state".
Add a new property whose name is `filename` and whose value is the desired filename.
For example, to set the filename to `abc.txt`, add a property named `filename` with a value of `abc.txt`.
To add the `txId` attribute as a prefix to the filename, add a property named `filename` with a value of `${txId}${filename}`.
Or, to make the filename more readable, separate the txId from the rest of the filename with a hyphen by using a value of `${txId}-${filename}`.
public class UpdateAttribute extends AbstractProcessor implements Searchable {