NIFI-6387 RetryFlowFile

NIFI-6387 RetryFlowFile

NIFI-6387 Maximum Retries support FLOWFILE_ATTRIBUTES scope

NIFI-6387 Fixed reuses descriptions for clarity

This closes #3541.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Travis Neeley 2019-07-03 22:11:26 -04:00 committed by Koji Kawamura
parent 0857d5f89d
commit 44b84a678f
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
3 changed files with 572 additions and 0 deletions

View File

@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"Retry", "FlowFile"})
@CapabilityDescription("FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a " +
"configured 'Maximum Retries' value. If the current attribute value is below the configured maximum, the " +
"FlowFile is passed to a retry relationship. The FlowFile may or may not be penalized in that condition. " +
"If the FlowFile's attribute value exceeds the configured maximum, the FlowFile will be passed to a " +
"'retries_exceeded' relationship. WARNING: If the incoming FlowFile has a non-numeric value in the " +
"configured 'Retry Attribute' attribute, it will be reset to '1'. You may choose to fail the FlowFile " +
"instead of performing the reset. Additional dynamic properties can be defined for any attributes you " +
"wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute " +
"expression language.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@SideEffectFree
@ReadsAttribute(attribute = "Retry Attribute",
description = "Will read the attribute or attribute expression language result as defined in 'Retry Attribute'")
@WritesAttributes({
@WritesAttribute(attribute = "Retry Attribute",
description = "User defined retry attribute is updated with the current retry count"),
@WritesAttribute(attribute = "Retry Attribute .uuid",
description = "User defined retry attribute with .uuid that determines what processor " +
"retried the FlowFile last")
})
@DynamicProperty(name = "Exceeded FlowFile Attribute Key",
value = "The value of the attribute added to the FlowFile",
description = "One or more dynamic properties can be used to add attributes to FlowFiles passed to " +
"the 'retries_exceeded' relationship",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
public class RetryFlowFile extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private String retryAttribute;
private Boolean penalizeRetried;
private Boolean failOnOverwrite;
private String reuseMode;
private String lastRetriedBy;
public static final PropertyDescriptor RETRY_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("retry-attribute")
.displayName("Retry Attribute")
.description("The name of the attribute that contains the current retry count for the FlowFile. " +
"WARNING: If the name matches an attribute already on the FlowFile that does not contain a " +
"numerical value, the processor will either overwrite that attribute with '1' or fail " +
"based on configuration.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("flowfile.retries")
.build();
public static final PropertyDescriptor MAXIMUM_RETRIES = new PropertyDescriptor.Builder()
.name("maximum-retries")
.displayName("Maximum Retries")
.description("The maximum number of times a FlowFile can be retried before being " +
"passed to the 'retries_exceeded' relationship")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("3")
.build();
public static final PropertyDescriptor PENALIZE_RETRIED = new PropertyDescriptor.Builder()
.name("penalize-retries")
.displayName("Penalize Retries")
.description("If set to 'true', this Processor will penalize input FlowFiles before passing them " +
"to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor FAIL_ON_OVERWRITE = new PropertyDescriptor.Builder()
.name("Fail on Non-numerical Overwrite")
.description("If the FlowFile already has the attribute defined in 'Retry Attribute' that is " +
"*not* a number, fail the FlowFile instead of resetting that value to '1'")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final AllowableValue FAIL_ON_REUSE = new AllowableValue(
"fail",
"Fail on Reuse",
"If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, fail the FlowFile" +
" regardless of current retry count"
);
public static final AllowableValue WARN_ON_REUSE = new AllowableValue(
"warn",
"Warn on Reuse",
"If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a warning " +
"message before resetting the retry attribute and UUID for this instance"
);
public static final AllowableValue RESET_ON_REUSE = new AllowableValue(
"reset",
"Reset Reuse",
"If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a debug " +
"message before resetting the retry attribute and UUID for this instance"
);
public static final PropertyDescriptor REUSE_MODE = new PropertyDescriptor.Builder()
.name("reuse-mode")
.displayName("Reuse Mode")
.description("Defines how the Processor behaves if the retry FlowFile has a different retry UUID than " +
"the instance that received the FlowFile. This generally means that the attribute was not reset " +
"after being successfully retried by a previous instance of this processor.")
.required(true)
.allowableValues(FAIL_ON_REUSE, WARN_ON_REUSE, RESET_ON_REUSE)
.defaultValue(FAIL_ON_REUSE.getValue())
.build();
public static final Relationship RETRY = new Relationship.Builder()
.name("retry")
.description("Input FlowFile has not exceeded the configured maximum retry count, pass this " +
"relationship back to the input Processor to create a limited feedback loop.")
.build();
public static final Relationship RETRIES_EXCEEDED = new Relationship.Builder()
.name("retries_exceeded")
.description("Input FlowFile has exceeded the configured maximum retry count, do not pass this " +
"relationship back to the input Processor to terminate the limited feedback loop.")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("The processor is configured such that a non-numerical value on 'Retry Attribute' " +
"results in a failure instead of resetting that value to '1'. This will immediately " +
"terminate the limited feedback loop. Might also include when 'Maximum Retries' contains " +
"attribute expression language that does not resolve to an Integer.")
.autoTerminateDefault(true)
.build();
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(RETRY_ATTRIBUTE);
props.add(MAXIMUM_RETRIES);
props.add(PENALIZE_RETRIED);
props.add(FAIL_ON_OVERWRITE);
props.add(REUSE_MODE);
this.properties = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>();
rels.add(RETRY);
rels.add(RETRIES_EXCEEDED);
rels.add(FAILURE);
this.relationships = Collections.unmodifiableSet(rels);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Attribute " + propertyDescriptorName + " will be placed on FlowFiles " +
"exceeding the retry count")
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnScheduled
@SuppressWarnings("unused")
public void onScheduled(final ProcessContext context) {
retryAttribute = context.getProperty(RETRY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
penalizeRetried = context.getProperty(PENALIZE_RETRIED).asBoolean();
failOnOverwrite = context.getProperty(FAIL_ON_OVERWRITE).asBoolean();
reuseMode = context.getProperty(REUSE_MODE).getValue();
lastRetriedBy = retryAttribute.concat(".uuid");
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowfile = session.get();
if (null == flowfile)
return;
String retryAttributeValue = flowfile.getAttribute(retryAttribute);
Integer currentRetry;
try {
currentRetry = (null == retryAttributeValue)
? 1
: Integer.valueOf(retryAttributeValue.trim()) + 1;
} catch (NumberFormatException ex) {
// Configured to fail if this was not a number
if (failOnOverwrite) {
session.transfer(flowfile, FAILURE);
return;
}
// reset to '1' if that wasn't the case
currentRetry = 1;
}
String lastRetriedByUUID = flowfile.getAttribute(lastRetriedBy);
String currentInstanceUUID = getIdentifier();
if (!StringUtils.isBlank(lastRetriedByUUID) && !currentInstanceUUID.equals(lastRetriedByUUID)) {
LogLevel reuseLogLevel = LogLevel.DEBUG;
switch (reuseMode) {
case "fail":
getLogger().error("FlowFile {} was previously retried with the same attribute by a " +
"different processor. Route to 'failure'", new Object[]{flowfile});
getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry - 1});
session.transfer(flowfile, FAILURE);
return;
case "warn":
reuseLogLevel = LogLevel.WARN;
case "reset":
getLogger().log(reuseLogLevel, "FlowFile {} was previously retried with the same attribute " +
"by a different processor. Reset the current retry count to '1'. Consider " +
"changing the retry attribute for this processor.",
new Object[]{flowfile});
getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry});
currentRetry = 1;
break;
}
}
Integer maximumRetries;
try {
maximumRetries = context.getProperty(MAXIMUM_RETRIES)
.evaluateAttributeExpressions(flowfile)
.asInteger();
if (null == maximumRetries) {
getLogger().warn("Could not obtain maximum retries off of FlowFile, route to 'failure'");
session.transfer(flowfile, FAILURE);
return;
}
} catch (NumberFormatException ex) {
getLogger().warn("Maximum Retries was not a number for this FlowFile, route to 'failure'");
session.transfer(flowfile, FAILURE);
return;
}
if (currentRetry > maximumRetries) {
// Add dynamic properties
for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic())
continue;
String value = context.getProperty(descriptor)
.evaluateAttributeExpressions(flowfile)
.getValue();
if (!StringUtils.isBlank(value))
flowfile = session.putAttribute(flowfile, descriptor.getName(), value);
}
flowfile = session.removeAttribute(flowfile, retryAttribute);
flowfile = session.removeAttribute(flowfile, lastRetriedBy);
session.transfer(flowfile, RETRIES_EXCEEDED);
} else {
if (penalizeRetried)
session.penalize(flowfile);
// Update and transfer
flowfile = session.putAttribute(flowfile, retryAttribute, String.valueOf(currentRetry));
flowfile = session.putAttribute(flowfile, lastRetriedBy, getIdentifier());
session.transfer(flowfile, RETRY);
}
}
}

View File

@ -101,6 +101,7 @@ org.apache.nifi.processors.standard.QueryDatabaseTableRecord
org.apache.nifi.processors.standard.QueryRecord
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RetryFlowFile
org.apache.nifi.processors.standard.RouteOnAttribute
org.apache.nifi.processors.standard.RouteOnContent
org.apache.nifi.processors.standard.RouteText

View File

@ -0,0 +1,257 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class TestRetryFlowFile {
TestRunner runner;
@Before
public void before() {
runner = TestRunners.newTestRunner(new RetryFlowFile());
}
@After
public void after() {
runner.shutdown();
}
@Test
public void testNoRetryAttribute() {
runner.enqueue("");
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeEquals("flowfile.retries", "1");
return true;
});
}
@Test
public void testRetryPenalize() {
runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeEquals("flowfile.retries", "3");
Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
return true;
});
}
@Test
public void testRetryClustered() {
runner.setClustered(true);
runner.setThreadCount(5);
for (int i = 0; i < 5; i++) {
runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
}
runner.run(5);
runner.assertTransferCount(RetryFlowFile.RETRY, 5);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeEquals("flowfile.retries", "3");
Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
return true;
});
}
@Test
public void testRetryNoPenalize() {
runner.setProperty(RetryFlowFile.PENALIZE_RETRIED, "false");
runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeEquals("flowfile.retries", "3");
Assert.assertFalse("FlowFile was not penalized!", mff.isPenalized());
return true;
});
}
@Test
public void testNoFailOnOverwrite() {
runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeEquals("flowfile.retries", "1");
Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
return true;
});
}
@Test
public void testFailOnOverwrite() {
runner.setProperty(RetryFlowFile.FAIL_ON_OVERWRITE, "true");
runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 0);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
}
@Test
public void testRetriesExceeded() {
runner.setProperty("exceeded.time", "${now():toString()}");
runner.setProperty("reason", "${uuid} exceeded retries");
runner.enqueue("", Collections.singletonMap("flowfile.retries", "3"));
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 0);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 1);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRIES_EXCEEDED, mff -> {
mff.assertAttributeExists("exceeded.time");
mff.assertAttributeExists("reason");
Assert.assertFalse("Expression language not evaluated!",
mff.getAttribute("reason").contains("${uuid}"));
return true;
});
}
@Test
public void testReuseFail() {
runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.FAIL_ON_REUSE.getValue());
Map<String, String> inputAttributes = new HashMap<>();
inputAttributes.put("flowfile.retries", "2");
inputAttributes.put("flowfile.retries.uuid", "1122334455");
runner.enqueue("", inputAttributes);
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 0);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
}
@Test
public void testReuseWarn() {
runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.WARN_ON_REUSE.getValue());
Map<String, String> inputAttributes = new HashMap<>();
inputAttributes.put("flowfile.retries", "2");
inputAttributes.put("flowfile.retries.uuid", "1122334455");
runner.enqueue("", inputAttributes);
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeEquals("flowfile.retries", "1");
return true;
});
}
@Test
public void testReuseReset() {
runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.RESET_ON_REUSE.getValue());
Map<String, String> inputAttributes = new HashMap<>();
inputAttributes.put("flowfile.retries", "2");
inputAttributes.put("flowfile.retries.uuid", "1122334455");
runner.enqueue("", inputAttributes);
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeEquals("flowfile.retries", "1");
return true;
});
}
@Test
public void testAlternativeAttributeMaxRetries() {
runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("retry.max", "3");
attributeMap.put("flowfile.retries", "2");
runner.enqueue("", attributeMap);
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 1);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
mff.assertAttributeExists("flowfile.retries");
mff.assertAttributeExists("flowfile.retries.uuid");
mff.assertAttributeEquals("flowfile.retries", "3");
Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
return true;
});
}
@Test
public void testInvalidAlternativeAttributeMaxRetries() {
runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("retry.max", "NiFi");
attributeMap.put("flowfile.retries", "2");
runner.enqueue("", attributeMap);
runner.run();
runner.assertTransferCount(RetryFlowFile.RETRY, 0);
runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
}
}