From e46f4131f9eade2eb9fe108241107688795904f5 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 9 Feb 2017 21:30:43 +0900 Subject: [PATCH] NIFI-3452: Add Wait processor Wait Mode property Ensure back-pressure is active until downstream processing completes. This closes #1490. --- .../apache/nifi/processors/standard/Wait.java | 119 ++++++++++++------ .../nifi/processors/standard/TestWait.java | 17 +++ 2 files changed, 95 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index 0bd5ca68b1..8ae85d8357 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -82,24 +82,27 @@ public class Wait extends AbstractProcessor { // Identifies the distributed map cache client public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("Distributed Cache Service") - .description("The Controller Service that is used to check for release signals from a corresponding Notify processor") - .required(true) - .identifiesControllerService(AtomicDistributedMapCacheClient.class) - .build(); + .name("distributed-cache-service") + .displayName("Distributed Cache Service") + .description("The Controller Service that is used to check for release signals from a corresponding Notify processor") + .required(true) + .identifiesControllerService(AtomicDistributedMapCacheClient.class) + .build(); // Selects the FlowFile attribute or expression, whose value is used as cache key public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder() - .name("Release Signal Identifier") - .description("A value, or the results of an Attribute Expression Language statement, which will " + - "be evaluated against a FlowFile in order to determine the release signal cache key") - .required(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) - .build(); + .name("release-signal-id") + .displayName("Release Signal Identifier") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the release signal cache key") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder() - .name("Target Signal Count") + .name("target-signal-count") + .displayName("Target Signal Count") .description("A value, or the results of an Attribute Expression Language statement, which will " + "be evaluated against a FlowFile in order to determine the target signal count. " + "This processor checks whether the signal count has reached this number. " + @@ -112,7 +115,8 @@ public class Wait extends AbstractProcessor { .build(); public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder() - .name("Signal Counter Name") + .name("signal-counter-name") + .displayName("Signal Counter Name") .description("A value, or the results of an Attribute Expression Language statement, which will " + "be evaluated against a FlowFile in order to determine the signal counter name. " + "If not specified, this processor checks the total count in a signal.") @@ -123,13 +127,14 @@ public class Wait extends AbstractProcessor { // Selects the FlowFile attribute or expression, whose value is used as cache key public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder() - .name("Expiration Duration") - .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship") - .required(true) - .defaultValue("10 min") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("expiration-duration") + .displayName("Expiration Duration") + .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship") + .required(true) + .defaultValue("10 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present", "When cached attributes are copied onto released FlowFiles, they replace any matching attributes."); @@ -138,33 +143,54 @@ public class Wait extends AbstractProcessor { "Attributes on released FlowFiles are not overwritten by copied cached attributes."); public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder() - .name("Attribute Copy Mode") - .description("Specifies how to handle attributes copied from flow files entering the Notify processor") - .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()) - .required(true) - .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL) - .expressionLanguageSupported(false) - .build(); + .name("attribute-copy-mode") + .displayName("Attribute Copy Mode") + .description("Specifies how to handle attributes copied from flow files entering the Notify processor") + .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()) + .required(true) + .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL) + .expressionLanguageSupported(false) + .build(); + + public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship", + "Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet." + + " This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship."); + + public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection", + "Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet." + + " This mode helps keeping upstream connection being full so that the upstream source processor" + + " will not be scheduled while back-pressure is active and limit incoming FlowFiles. "); + + public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder() + .name("wait-mode") + .displayName("Wait Mode") + .description("Specifies how to handle a FlowFile waiting for a notify signal") + .defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue()) + .required(true) + .allowableValues(WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM) + .expressionLanguageSupported(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile with a matching release signal in the cache will be routed to this relationship") - .build(); + .name("success") + .description("A FlowFile with a matching release signal in the cache will be routed to this relationship") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") - .build(); + .name("failure") + .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") + .build(); public static final Relationship REL_WAIT = new Relationship.Builder() - .name("wait") - .description("A FlowFile with no matching release signal in the cache will be routed to this relationship") - .build(); + .name("wait") + .description("A FlowFile with no matching release signal in the cache will be routed to this relationship") + .build(); public static final Relationship REL_EXPIRED = new Relationship.Builder() - .name("expired") - .description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship") - .build(); + .name("expired") + .description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship") + .build(); + private final Set relationships; public Wait() { @@ -185,6 +211,7 @@ public class Wait extends AbstractProcessor { descriptors.add(EXPIRATION_DURATION); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_COPY_MODE); + descriptors.add(WAIT_MODE); return descriptors; } @@ -259,7 +286,17 @@ public class Wait extends AbstractProcessor { if (logger.isDebugEnabled()) { logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile}); } - session.transfer(flowFile, REL_WAIT); + + + final String waitMode = context.getProperty(WAIT_MODE).getValue(); + if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) { + session.transfer(flowFile, REL_WAIT); + } else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) { + // Transfer to self. + session.transfer(flowFile); + } else { + throw new ProcessException("Unsupported wait mode " + waitMode + " was specified."); + } return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index e1117d53eb..0ce0045d08 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; + import org.apache.nifi.processors.standard.TestNotify.MockCacheClient; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -62,6 +63,22 @@ public class TestWait { runner.clearTransferState(); } + @Test + public void testWaitKeepInUpstreamConnection() throws InitializationException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.WAIT_MODE, Wait.WAIT_MODE_KEEP_IN_UPSTREAM); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + runner.enqueue(new byte[]{}, props); + + runner.run(); + + // The FlowFile stays in the upstream connection. + runner.assertQueueNotEmpty(); + runner.clearTransferState(); + } + @Test public void testExpired() throws InitializationException, InterruptedException { runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");