From d049bca7948227cdddcd59a50c2daf104e2883f8 Mon Sep 17 00:00:00 2001
From: Peter Gyori
Date: Fri, 5 Feb 2021 18:49:39 +0100
Subject: [PATCH] NIFI-8205: Documentation improvements for the Wait processor
Signed-off-by: Pierre Villard
This closes #4808.
---
.../apache/nifi/processors/standard/Wait.java | 39 ++++---
.../additionalDetails.html | 110 ++++++++++++++++--
2 files changed, 127 insertions(+), 22 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 37f4479301..83ac2e82fd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -72,9 +72,15 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
+ "is stored in the distributed cache from a corresponding Notify processor. "
- + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
- + "with attributes copied from the FlowFile that produced the release signal from the Notify processor. "
- + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. "
+ + "The release signal entry is then removed from the cache. "
+ + "The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex "
+ + "property of the corresponding Notify processor is set properly. "
+ + "If there are multiple release signals in the cache identified by the Release Signal Identifier, "
+ + "and the Notify processor is configured to copy the FlowFile attributes to the cache, "
+ + "then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles "
+ + "that produced the release signals in the cache (identified by Release Signal Identifier). "
+ + "Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
@@ -88,8 +94,9 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. "
+ "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
- @WritesAttribute(attribute = "wait.counter.", description = "If a signal exists when the processor runs, "
- + "each count value in the signal is copied.")
+ @WritesAttribute(attribute = "wait.counter.", description = "The name of each counter for which at least one signal "
+ + "has been present in the cache since the last time the cache was empty "
+ + "gets copied to the current FlowFile as an attribute.")
})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
@@ -110,8 +117,9 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("release-signal-id")
.displayName("Release Signal Identifier")
- .description("A value, or the results of an Attribute Expression Language statement, which will " +
- "be evaluated against a FlowFile in order to determine the release signal cache key")
+ .description("A value that specifies the key to a specific release signal cache. "
+ + "To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' "
+ + "or the 'wait' relationship, the processor checks the signals in the cache specified by this key.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -120,11 +128,12 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
.name("target-signal-count")
.displayName("Target Signal Count")
- .description("A value, or the results of an Attribute Expression Language statement, which will " +
- "be evaluated against a FlowFile in order to determine the target signal count. " +
- "This processor checks whether the signal count has reached this number. " +
- "If Signal Counter Name is specified, this processor checks a particular counter, " +
- "otherwise checks against total count in a signal.")
+ .description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) "
+ + "in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. "
+ + "If the number of signals in the cache has reached this number, the FlowFile is routed to the "
+ + "'success' relationship and the number of signals in the cache is decreased by this value. "
+ + "If Signal Counter Name is specified, this processor checks a particular counter, "
+ + "otherwise checks against the total number of signals in the cache.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -134,9 +143,9 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("signal-counter-name")
.displayName("Signal Counter Name")
- .description("A value, or the results of an Attribute Expression Language statement, which will " +
- "be evaluated against a FlowFile in order to determine the signal counter name. " +
- "If not specified, this processor checks the total count in a signal.")
+ .description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. "
+ + "If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. "
+ + "If not specified, the processor checks the total number of signals in the cache.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
index 4f54271c93..f81a2ca044 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
@@ -15,9 +15,9 @@
limitations under the License.
-->
-
- ValidateCsv
-
+
+ Wait
+
@@ -32,10 +32,10 @@ When a Wait processor is expected to process multiple signal ids, by configuring
Also, you will need to have high level understanding on how Wait processor works:
-
Wait processor only process a single signal id at a time
-
How frequent Wait processor runs is defined at 'Run Schedule'
-
Which FlowFile is processed is determined by Prioritizer
-
Not limited to Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set
+
The Wait processor only processes a single signal id at a time
+
How frequently the Wait processor runs is defined in the 'Run Schedule'
+
Which FlowFile is processed is determined by a Prioritizer
+
Not limited to the Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set
@@ -273,5 +273,101 @@ Repeat above cycle, thus F5 will not be released until one of F1 ... F4 is relea
To mitigate such limitation, increasing 'Run Duration' is recommended. By increasing 'Run Duration', Wait processor can keep being scheduled for that duration. For example, with 'Run Duration' 500 ms, Wait should be able to loop through all 5 queued FlowFiles at a single run.
+
Using counters
+
+ A counter is basically a label to differentiate signals within the cache.
+ (A cache in this context is a "container" that contains signals that have the same signal identifier.)
+
+
+ Let's suppose that there are the following signals in the cache
+ (note, that these are not FlowFiles on the incoming (or wait) connection of the Wait processor, like
+ in the examples above, but release signals stored in the cache.)
+
+
+
+
Signal ID
+
Signal Counter Name
+
+
+
A
+
counter_1
+
+
+
A
+
counter_1
+
+
+
A
+
counter_2
+
+
+
+ In this state, the following FlowFile gets processed by the Wait processor,
+ (the FlowFile has a signal_counter_name attribute and the Wait processor is configured
+ to use the value of this attribute as the value of the Signal Counter Name property):
+
+
+
+
FlowFile UUID
+
Signal ID
+
signal_counter_name
+
+
+
a-1
+
A
+
counter_3
+
+
+
+ Despite the fact that the cache identified by Signal ID "A" has signals in it,
+ the FlowFile above will be sent to the 'wait' relationship, since there is no signal
+ in the cache that belongs to the counter named "counter_3".
+
+
+ Let's suppose, that the state of the cache is the same as above, and the following
+ FlowFile gets processed by the Wait processor:
+
+
+
+
FlowFile UUID
+
Signal ID
+
signal_counter_name
+
+
+
a-2
+
A
+
counter_1
+
+
+
+ The FlowFile is transmitted to the 'success' relationship, since cache "A" has
+ signals in it and there are signals that belong to "counter_1". The outgoing FlowFile
+ will have the following attributes and their values appended to it:
+
+
+
wait.counter.counter_1 : 2
+
wait.counter.counter_2 : 1
+
wait.counter.total : 3
+
+
+ The key point here is that counters can be used to differentiate between signals within the
+ cache. If counters are used, a new attribute will be appended to the FlowFile
+ passing the Wait processor for each counter. If a large number of counters are used
+ within a cache, the FlowFile passing the Wait processor will have a large number of
+ attributes appended to it. To avoid that, it is recommended to use multiple caches with
+ a few counters in each, instead of one cache with many counters.
+
+
+ For example:
+
+
+
Cache identified by Release Signal ID "A" has counters: "counter_1" and "counter_2"
+
Cache identified by Release Signal ID "B" has counters: "counter_3" and "counter_4"
+
Cache identified by Release Signal ID "C" has counters: "counter_5" and "counter_6"
+
+
+(Counter names do not need to be unique between caches, the counter name(s) used in cache "A"
+could be reused in cache "B" and "C" as well.)
+