diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index e2975560ac..45ffcb25a9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -43,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -212,6 +214,24 @@ public class Wait extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); + public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder() + .name("wait-penalty-duration") + .displayName("Wait Penalty Duration") + .description("If configured, after a signal identifier got processed but did not meet the release criteria," + + " the signal identifier is penalized and FlowFiles having the signal identifier" + + " will not be processed again for the specified period of time," + + " so that the signal identifier will not block others to be processed." + + " This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers," + + " and each signal identifier has multiple FlowFiles," + + " and also the order of releasing FlowFiles is important within a signal identifier." + + " The FlowFile order can be configured with Prioritizers." + + " IMPORTANT: There is a limitation of number of queued signals can be processed," + + " and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile with a matching release signal in the cache will be routed to this relationship") @@ -234,6 +254,8 @@ public class Wait extends AbstractProcessor { private final Set relationships; + private final Map signalIdPenalties = new HashMap<>(); + public Wait() { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); @@ -255,6 +277,7 @@ public class Wait extends AbstractProcessor { descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_COPY_MODE); descriptors.add(WAIT_MODE); + descriptors.add(WAIT_PENALTY_DURATION); return descriptors; } @@ -280,6 +303,19 @@ public class Wait extends AbstractProcessor { final List failedFilteringFlowFiles = new ArrayList<>(); final Supplier acceptResultSupplier = () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE; + + // Clear expired penalties. + if (!signalIdPenalties.isEmpty()) { + final Iterator> penaltyIterator = signalIdPenalties.entrySet().iterator(); + final long now = System.currentTimeMillis(); + while (penaltyIterator.hasNext()) { + final Entry penalty = penaltyIterator.next(); + if (penalty.getValue() < now) { + penaltyIterator.remove(); + } + } + } + final List flowFiles = session.get(f -> { final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue(); @@ -292,6 +328,11 @@ public class Wait extends AbstractProcessor { return ACCEPT_AND_CONTINUE; } + if (signalIdPenalties.containsKey(fSignalId)) { + // This id is penalized. + return REJECT_AND_CONTINUE; + } + final String targetSignalIdStr = targetSignalId.get(); if (targetSignalIdStr == null) { // This is the first one. @@ -468,6 +509,12 @@ public class Wait extends AbstractProcessor { // Transfer FlowFiles. processedFlowFiles.entrySet().forEach(transferFlowFiles); + // Penalize signal id if no FlowFile transferred to success. + final PropertyValue waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION); + if (waitPenaltyDuration.isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) { + signalIdPenalties.put(signalId, System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS)); + } + // Update signal if needed. try { if (waitCompleted) { @@ -515,4 +562,12 @@ public class Wait extends AbstractProcessor { return session.putAllAttributes(flowFile, attributesToCopy); } + @OnStopped + public void onStopped(final ProcessContext context) { + signalIdPenalties.clear(); + } + + Map getSignalIdPenalties() { + return signalIdPenalties; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html new file mode 100644 index 0000000000..4f54271c93 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html @@ -0,0 +1,277 @@ + + + + + + ValidateCsv + + + + + + + +

Best practices to handle multiple signal ids at a Wait processor

+ +When a Wait processor is expected to process multiple signal ids, by configuring 'Release Signal Identifier' with a FlowFile attribute Expression Language, there are few things to consider in order to get the expected result. Processor configuration can vary based on your requirement. + +Also, you will need to have high level understanding on how Wait processor works: +
    +
  • Wait processor only process a single signal id at a time
  • +
  • How frequent Wait processor runs is defined at 'Run Schedule'
  • +
  • Which FlowFile is processed is determined by Prioritizer
  • +
  • Not limited to Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set
  • +
+ + +See following sections for common patterns + + +

Release any FlowFile as soon as its signal is notified

+ +This is the most common use case. +FlowFiles are independent and can be released in any order. + +

Important configurations:

+
    +
  • Use FirstInFirstOutPrioritizer (FIFO) at 'wait' relationship (or the incoming connection if 'Wait Mode' is 'Keep in the upstream connection)
  • +
+ +The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
# of Wait runNotified SignalsQueue Index (FIFO)FlowFile UUIDSignal ID 
1B1aAThis FlowFile is processed. But its signal is not found, and will be re-queued at the end of the queue.
2bB 
3cC 
2B1bBThis FlowFile is processed and since its signal is notified, this one will be released to 'success'.
2cC 
3aA 
3 1cCThis FlowFile will be processed at the next run.
2aA 
+ + + +

Release higher priority FlowFiles in each signal id

+ +Multiple FlowFiles share the same signal id, and the order of releasing a FlowFile is important. + +

Important configurations:

+
    +
  • Use a (or set of a) Prioritizer(s) suites your need other than FIFO, at 'wait' relationship (or the incoming connection if 'Wait Mode' is 'Keep in the upstream connection), e.g. PriorityPrioritizer
  • +
  • Specify adequate 'Wait Penalty Duration', e.g. "3 sec",
  • +
  • 'Wait Penalty Duration' should be grater than 'Run Schedule', e.g "3 sec" > "1 sec"
  • +
  • Increase 'Run Duration' to avoid the limitation of number of signal ids (see the note below)
  • +
+ +The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle. +The example uses PriorityPrioritizer to control the order of processing FlowFiles within a signal id. + +If 'Wait Penalty Duration' is configured, Wait processor tracks unreleased signal ids and their penalty representing when they will be checked again. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
# of Wait runNotified SignalsSignal PenaltiesQueue Index (via 'priority' attribute)FlowFile UUIDSignal ID'priority' attr 
1 (00:01)B 1a-1A1This FlowFile is processed. But its signal is not found. Penalized.
2b-1B1Since a-1 and b-1 have the same priority '1', b-1 may be processed before a-1. You can add another Prioritizer to define more specific ordering.
3b-2B2 
2 (00:02)BA (00:04)1a-1A1This FlowFile is the first one according to the configured Prioritizer, but the signal id is penalized. So, this FlowFile is skipped at this execution.
2b-1B1This FlowFile is processed.
3b-2B2 
3 (00:03) A (00:04)1a-1A1This FlowFile is the first one but is still penalized.
2b-2B2This FlowFile is processed, but its signal is not notified yet, thus will be penalized.
4 (00:04) B (00:06)1a-1A1This FlowFile is no longer penalized, and get processed. But its signal is not notified yet, thus will be penalized again.
2b-2B2 
+ +

The importance of 'Run Duration' when 'Wait Penalty Duration' is used

+ +

+There are limitation of number of signals can be checked based on the combination of 'Run Schedule' and 'Wait Penalize Duration'. +If this limitation is engaged, some FlowFiles may not be processed and remain in the 'wait' relationship even if their signal ids are notified. +Let's say Wait is configured with: +

+ +
    +
  • Run Schedule = 1 sec
  • +
  • Wait Penalize Duration = 3 sec
  • +
  • Release Signal Identifier = ${uuid}
  • +
+ +

+And there are 5 FlowFiles F1, F2 ... F5 in the 'wait' relationship. +Then the signal for F5 is notified. +Wait will work as follows: +

+ +
    +
  • At 00:00 Wait checks the signal for F1, not found, and penalize F1 (till 00:03)
  • +
  • At 00:01 Wait checks the signal for F2, not found, and penalize F2 (till 00:04)
  • +
  • At 00:02 Wait checks the signal for F3, not found, and penalize F3 (till 00:05)
  • +
  • At 00:03 Wait checks the signal for F4, not found, and penalize F4 (till 00:06)
  • +
  • At 00:04 Wait checks the signal for F1 again, because it's not penalized any longer
  • +
+Repeat above cycle, thus F5 will not be released until one of F1 ... F4 is released. + +

+To mitigate such limitation, increasing 'Run Duration' is recommended. By increasing 'Run Duration', Wait processor can keep being scheduled for that duration. For example, with 'Run Duration' 500 ms, Wait should be able to loop through all 5 queued FlowFiles at a single run. +

+ + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 2ccb2fec66..7970601c3a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -205,6 +205,35 @@ public class TestWait { } } + @Test + public void testWaitPenaltyDuration() throws InitializationException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.WAIT_PENALTY_DURATION, "1 hour"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + runner.enqueue(new byte[]{}, props); + + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + runner.clearTransferState(); + + // The signal id should be penalized + final Wait processor = (Wait) runner.getProcessor(); + final Map signalIdPenalties = processor.getSignalIdPenalties(); + assertEquals(1, signalIdPenalties.size()); + assertTrue(signalIdPenalties.containsKey("1")); + + // FlowFile with the penalized id shouldn't be processed + runner.enqueue(new byte[]{}, props); + + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 0); + runner.clearTransferState(); + } + @Test public void testReplaceAttributes() throws InitializationException, IOException { Map cachedAttributes = new HashMap<>();