NIFI-3452: Add Wait processor Wait Mode property

Ensure back-pressure is active until downstream processing completes.

This closes #1490.
This commit is contained in:
Koji Kawamura 2017-02-09 21:30:43 +09:00 committed by Pierre Villard
parent 9cfc13423d
commit e46f4131f9
2 changed files with 95 additions and 41 deletions

View File

@ -82,24 +82,27 @@ public class Wait extends AbstractProcessor {
// Identifies the distributed map cache client // Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service") .name("distributed-cache-service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor") .displayName("Distributed Cache Service")
.required(true) .description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.identifiesControllerService(AtomicDistributedMapCacheClient.class) .required(true)
.build(); .identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key // Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder() public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Release Signal Identifier") .name("release-signal-id")
.description("A value, or the results of an Attribute Expression Language statement, which will " + .displayName("Release Signal Identifier")
"be evaluated against a FlowFile in order to determine the release signal cache key") .description("A value, or the results of an Attribute Expression Language statement, which will " +
.required(true) "be evaluated against a FlowFile in order to determine the release signal cache key")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) .required(true)
.expressionLanguageSupported(true) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.build(); .expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder() public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
.name("Target Signal Count") .name("target-signal-count")
.displayName("Target Signal Count")
.description("A value, or the results of an Attribute Expression Language statement, which will " + .description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the target signal count. " + "be evaluated against a FlowFile in order to determine the target signal count. " +
"This processor checks whether the signal count has reached this number. " + "This processor checks whether the signal count has reached this number. " +
@ -112,7 +115,8 @@ public class Wait extends AbstractProcessor {
.build(); .build();
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("Signal Counter Name") .name("signal-counter-name")
.displayName("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " + .description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " + "be evaluated against a FlowFile in order to determine the signal counter name. " +
"If not specified, this processor checks the total count in a signal.") "If not specified, this processor checks the total count in a signal.")
@ -123,13 +127,14 @@ public class Wait extends AbstractProcessor {
// Selects the FlowFile attribute or expression, whose value is used as cache key // Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("Expiration Duration") .name("expiration-duration")
.description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship") .displayName("Expiration Duration")
.required(true) .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship")
.defaultValue("10 min") .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("10 min")
.expressionLanguageSupported(false) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .expressionLanguageSupported(false)
.build();
public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present", public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present",
"When cached attributes are copied onto released FlowFiles, they replace any matching attributes."); "When cached attributes are copied onto released FlowFiles, they replace any matching attributes.");
@ -138,33 +143,54 @@ public class Wait extends AbstractProcessor {
"Attributes on released FlowFiles are not overwritten by copied cached attributes."); "Attributes on released FlowFiles are not overwritten by copied cached attributes.");
public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder() public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
.name("Attribute Copy Mode") .name("attribute-copy-mode")
.description("Specifies how to handle attributes copied from flow files entering the Notify processor") .displayName("Attribute Copy Mode")
.defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()) .description("Specifies how to handle attributes copied from flow files entering the Notify processor")
.required(true) .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
.allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL) .required(true)
.expressionLanguageSupported(false) .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
.build(); .expressionLanguageSupported(false)
.build();
public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship",
"Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet." +
" This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship.");
public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection",
"Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet." +
" This mode helps keeping upstream connection being full so that the upstream source processor" +
" will not be scheduled while back-pressure is active and limit incoming FlowFiles. ");
public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder()
.name("wait-mode")
.displayName("Wait Mode")
.description("Specifies how to handle a FlowFile waiting for a notify signal")
.defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue())
.required(true)
.allowableValues(WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM)
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("A FlowFile with a matching release signal in the cache will be routed to this relationship") .description("A FlowFile with a matching release signal in the cache will be routed to this relationship")
.build(); .build();
public static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
.build(); .build();
public static final Relationship REL_WAIT = new Relationship.Builder() public static final Relationship REL_WAIT = new Relationship.Builder()
.name("wait") .name("wait")
.description("A FlowFile with no matching release signal in the cache will be routed to this relationship") .description("A FlowFile with no matching release signal in the cache will be routed to this relationship")
.build(); .build();
public static final Relationship REL_EXPIRED = new Relationship.Builder() public static final Relationship REL_EXPIRED = new Relationship.Builder()
.name("expired") .name("expired")
.description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship") .description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship")
.build(); .build();
private final Set<Relationship> relationships; private final Set<Relationship> relationships;
public Wait() { public Wait() {
@ -185,6 +211,7 @@ public class Wait extends AbstractProcessor {
descriptors.add(EXPIRATION_DURATION); descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE); descriptors.add(ATTRIBUTE_COPY_MODE);
descriptors.add(WAIT_MODE);
return descriptors; return descriptors;
} }
@ -259,7 +286,17 @@ public class Wait extends AbstractProcessor {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile}); logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
} }
session.transfer(flowFile, REL_WAIT);
final String waitMode = context.getProperty(WAIT_MODE).getValue();
if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) {
session.transfer(flowFile, REL_WAIT);
} else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
// Transfer to self.
session.transfer(flowFile);
} else {
throw new ProcessException("Unsupported wait mode " + waitMode + " was specified.");
}
return; return;
} }

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.nifi.processors.standard.TestNotify.MockCacheClient; import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -62,6 +63,22 @@ public class TestWait {
runner.clearTransferState(); runner.clearTransferState();
} }
@Test
public void testWaitKeepInUpstreamConnection() throws InitializationException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.WAIT_MODE, Wait.WAIT_MODE_KEEP_IN_UPSTREAM);
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
runner.enqueue(new byte[]{}, props);
runner.run();
// The FlowFile stays in the upstream connection.
runner.assertQueueNotEmpty();
runner.clearTransferState();
}
@Test @Test
public void testExpired() throws InitializationException, InterruptedException { public void testExpired() throws InitializationException, InterruptedException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");