diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 9ffd450eb3..0147608045 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -45,6 +45,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -99,17 +100,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .description("All successful FlowFiles are routed to this relationship").name("success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build(); public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build(); static { Set tempStatelessSet = new HashSet<>(); tempStatelessSet.add(REL_SUCCESS); + tempStatelessSet.add(REL_FAILURE); statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet); Set tempStatefulSet = new HashSet<>(); tempStatefulSet.add(REL_SUCCESS); + tempStatefulSet.add(REL_FAILURE); tempStatefulSet.add(REL_FAILED_SET_STATE); statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet); @@ -144,6 +149,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } }; + public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." + + "This is based on the original behavior of the processor to allow for a smooth transition."); + public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship", + "If chosen, failed FlowFiles will be routed to the failure relationship."); + public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder() + .name("update-attribute-failure-behavior") + .displayName("Failure Behavior") + .description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " + + "changed by the user to route to a failure relationship instead.") + .allowableValues(FAIL_STOP, FAIL_ROUTE) + .defaultValue(FAIL_STOP.getValue()) + .required(true) + .build(); + + // static properties public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression"; public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() @@ -197,6 +217,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { descriptors.add(DELETE_ATTRIBUTES); descriptors.add(STORE_STATE); descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); + descriptors.add(FAILURE_BEHAVIOR); return Collections.unmodifiableList(descriptors); } @@ -444,40 +465,52 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { Map defaultActions = this.defaultActions; List flowFilesToTransfer = new LinkedList<>(); - // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { - // apply the actions for each rule and transfer the flowfile - for (final Map.Entry> entry : matchedRules.entrySet()) { - FlowFile match = entry.getKey(); - final List rules = entry.getValue(); - boolean updateWorking = incomingFlowFile.equals(match); + boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue()); + try { + // if there is update criteria specified, evaluate it + if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { + // apply the actions for each rule and transfer the flowfile + for (final Map.Entry> entry : matchedRules.entrySet()) { + FlowFile match = entry.getKey(); + final List rules = entry.getValue(); + boolean updateWorking = incomingFlowFile.equals(match); - // execute each matching rule(s) - match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); + // execute each matching rule(s) + match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); - if (updateWorking) { - incomingFlowFile = match; + if (updateWorking) { + incomingFlowFile = match; + } + + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); + } + + // add the match to the list to transfer + flowFilesToTransfer.add(match); } + } else { + // Either we're running without any rules or the FlowFile didn't match any + incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); } - // add the match to the list to transfer - flowFilesToTransfer.add(match); + // add the flowfile to the list to transfer + flowFilesToTransfer.add(incomingFlowFile); } - } else { - // Either we're running without any rules or the FlowFile didn't match any - incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); - - if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); + } catch (ProcessException pe) { + if (routeToFailure) { + session.transfer(incomingFlowFile, REL_FAILURE); + getLogger().error("Failed to update flowfile attribute(s).", pe); + return; + } else { + throw pe; } - - // add the flowfile to the list to transfer - flowFilesToTransfer.add(incomingFlowFile); } + if (stateInitialAttributes != null) { try { // Able to use "equals()" since we're just checking if the map was modified at all @@ -713,7 +746,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final Map defaultActions = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { - if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { + if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE + && entry.getKey() != FAILURE_BEHAVIOR) { final Action action = new Action(); action.setAttribute(entry.getKey().getName()); action.setValue(entry.getValue()); diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 50938e673c..35b5536b51 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -1005,4 +1005,33 @@ public class TestUpdateAttribute { } } + @Test + public void testInvalidExpressionLanguage() { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setVariable("test", "Squirrel!!1!"); + runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}"); + runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE); + runner.assertValid(); + + runner.enqueue("Test"); + runner.run(); + + runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0); + runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1); + + runner.clearTransferState(); + + Throwable ex = null; + try { + runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP); + runner.enqueue("Test"); + runner.run(); + } catch (Throwable t) { + ex = t; + } finally { + Assert.assertNotNull(ex); + Assert.assertTrue(ex.getCause() instanceof ProcessException); + runner.assertQueueNotEmpty(); + } + } }