mirror of https://github.com/apache/nifi.git
NIFI-5448 Added failure relationship to UpdateAttributes to handle bad expression language logic.
This closes #2914 Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
parent
451084e11f
commit
32ee552ada
|
@ -45,6 +45,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
|
@ -99,17 +100,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
// relationships
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.description("All successful FlowFiles are routed to this relationship").name("success").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build();
|
||||
public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder()
|
||||
.description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build();
|
||||
|
||||
static {
|
||||
Set<Relationship> tempStatelessSet = new HashSet<>();
|
||||
tempStatelessSet.add(REL_SUCCESS);
|
||||
tempStatelessSet.add(REL_FAILURE);
|
||||
|
||||
statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet);
|
||||
|
||||
Set<Relationship> tempStatefulSet = new HashSet<>();
|
||||
tempStatefulSet.add(REL_SUCCESS);
|
||||
tempStatefulSet.add(REL_FAILURE);
|
||||
tempStatefulSet.add(REL_FAILED_SET_STATE);
|
||||
|
||||
statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet);
|
||||
|
@ -144,6 +149,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
}
|
||||
};
|
||||
|
||||
public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." +
|
||||
"This is based on the original behavior of the processor to allow for a smooth transition.");
|
||||
public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship",
|
||||
"If chosen, failed FlowFiles will be routed to the failure relationship.");
|
||||
public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder()
|
||||
.name("update-attribute-failure-behavior")
|
||||
.displayName("Failure Behavior")
|
||||
.description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " +
|
||||
"changed by the user to route to a failure relationship instead.")
|
||||
.allowableValues(FAIL_STOP, FAIL_ROUTE)
|
||||
.defaultValue(FAIL_STOP.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
||||
// static properties
|
||||
public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
|
||||
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
|
||||
|
@ -197,6 +217,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
descriptors.add(DELETE_ATTRIBUTES);
|
||||
descriptors.add(STORE_STATE);
|
||||
descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
|
||||
descriptors.add(FAILURE_BEHAVIOR);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
|
@ -444,40 +465,52 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
Map<String, Action> defaultActions = this.defaultActions;
|
||||
List<FlowFile> flowFilesToTransfer = new LinkedList<>();
|
||||
|
||||
// if there is update criteria specified, evaluate it
|
||||
if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
|
||||
// apply the actions for each rule and transfer the flowfile
|
||||
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
|
||||
FlowFile match = entry.getKey();
|
||||
final List<Rule> rules = entry.getValue();
|
||||
boolean updateWorking = incomingFlowFile.equals(match);
|
||||
boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue());
|
||||
try {
|
||||
// if there is update criteria specified, evaluate it
|
||||
if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
|
||||
// apply the actions for each rule and transfer the flowfile
|
||||
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
|
||||
FlowFile match = entry.getKey();
|
||||
final List<Rule> rules = entry.getValue();
|
||||
boolean updateWorking = incomingFlowFile.equals(match);
|
||||
|
||||
// execute each matching rule(s)
|
||||
match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
|
||||
// execute each matching rule(s)
|
||||
match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
|
||||
|
||||
if (updateWorking) {
|
||||
incomingFlowFile = match;
|
||||
if (updateWorking) {
|
||||
incomingFlowFile = match;
|
||||
}
|
||||
|
||||
if (debugEnabled) {
|
||||
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
|
||||
}
|
||||
|
||||
// add the match to the list to transfer
|
||||
flowFilesToTransfer.add(match);
|
||||
}
|
||||
} else {
|
||||
// Either we're running without any rules or the FlowFile didn't match any
|
||||
incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);
|
||||
|
||||
if (debugEnabled) {
|
||||
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
|
||||
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
|
||||
}
|
||||
|
||||
// add the match to the list to transfer
|
||||
flowFilesToTransfer.add(match);
|
||||
// add the flowfile to the list to transfer
|
||||
flowFilesToTransfer.add(incomingFlowFile);
|
||||
}
|
||||
} else {
|
||||
// Either we're running without any rules or the FlowFile didn't match any
|
||||
incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);
|
||||
|
||||
if (debugEnabled) {
|
||||
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
|
||||
} catch (ProcessException pe) {
|
||||
if (routeToFailure) {
|
||||
session.transfer(incomingFlowFile, REL_FAILURE);
|
||||
getLogger().error("Failed to update flowfile attribute(s).", pe);
|
||||
return;
|
||||
} else {
|
||||
throw pe;
|
||||
}
|
||||
|
||||
// add the flowfile to the list to transfer
|
||||
flowFilesToTransfer.add(incomingFlowFile);
|
||||
}
|
||||
|
||||
|
||||
if (stateInitialAttributes != null) {
|
||||
try {
|
||||
// Able to use "equals()" since we're just checking if the map was modified at all
|
||||
|
@ -713,7 +746,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
final Map<String, Action> defaultActions = new HashMap<>();
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
|
||||
if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) {
|
||||
if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE
|
||||
&& entry.getKey() != FAILURE_BEHAVIOR) {
|
||||
final Action action = new Action();
|
||||
action.setAttribute(entry.getKey().getName());
|
||||
action.setValue(entry.getValue());
|
||||
|
|
|
@ -1005,4 +1005,33 @@ public class TestUpdateAttribute {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidExpressionLanguage() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||
runner.setVariable("test", "Squirrel!!1!");
|
||||
runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}");
|
||||
runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("Test");
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1);
|
||||
|
||||
runner.clearTransferState();
|
||||
|
||||
Throwable ex = null;
|
||||
try {
|
||||
runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP);
|
||||
runner.enqueue("Test");
|
||||
runner.run();
|
||||
} catch (Throwable t) {
|
||||
ex = t;
|
||||
} finally {
|
||||
Assert.assertNotNull(ex);
|
||||
Assert.assertTrue(ex.getCause() instanceof ProcessException);
|
||||
runner.assertQueueNotEmpty();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue