diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index b5ff4ffae5..cee8d2227e 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -108,21 +107,17 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .description("All successful FlowFiles are routed to this relationship").name("success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build(); public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build(); static { Set tempStatelessSet = new HashSet<>(); tempStatelessSet.add(REL_SUCCESS); - tempStatelessSet.add(REL_FAILURE); statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet); Set tempStatefulSet = new HashSet<>(); tempStatefulSet.add(REL_SUCCESS); - tempStatefulSet.add(REL_FAILURE); tempStatefulSet.add(REL_FAILED_SET_STATE); statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet); @@ -157,21 +152,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } }; - public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." + - "This is based on the original behavior of the processor to allow for a smooth transition."); - public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship", - "If chosen, failed FlowFiles will be routed to the failure relationship."); - public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder() - .name("update-attribute-failure-behavior") - .displayName("Failure Behavior") - .description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " + - "changed by the user to route to a failure relationship instead.") - .allowableValues(FAIL_STOP, FAIL_ROUTE) - .defaultValue(FAIL_STOP.getValue()) - .required(true) - .build(); - - // static properties public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression"; public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() @@ -225,7 +205,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { descriptors.add(DELETE_ATTRIBUTES); descriptors.add(STORE_STATE); descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); - descriptors.add(FAILURE_BEHAVIOR); return Collections.unmodifiableList(descriptors); } @@ -473,51 +452,39 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { Map defaultActions = this.defaultActions; List flowFilesToTransfer = new LinkedList<>(); - boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue()); - try { - // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { - // apply the actions for each rule and transfer the flowfile - for (final Map.Entry> entry : matchedRules.entrySet()) { - FlowFile match = entry.getKey(); - final List rules = entry.getValue(); - boolean updateWorking = incomingFlowFile.equals(match); + // if there is update criteria specified, evaluate it + if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { + // apply the actions for each rule and transfer the flowfile + for (final Map.Entry> entry : matchedRules.entrySet()) { + FlowFile match = entry.getKey(); + final List rules = entry.getValue(); + boolean updateWorking = incomingFlowFile.equals(match); - // execute each matching rule(s) - match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); + // execute each matching rule(s) + match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); - if (updateWorking) { - incomingFlowFile = match; - } - - if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); - } - - // add the match to the list to transfer - flowFilesToTransfer.add(match); + if (updateWorking) { + incomingFlowFile = match; } - } else { - // Either we're running without any rules or the FlowFile didn't match any - incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); } - // add the flowfile to the list to transfer - flowFilesToTransfer.add(incomingFlowFile); + // add the match to the list to transfer + flowFilesToTransfer.add(match); } - } catch (ProcessException pe) { - if (routeToFailure) { - session.transfer(incomingFlowFile, REL_FAILURE); - getLogger().error("Failed to update flowfile attribute(s).", pe); - return; - } else { - throw pe; - } - } + } else { + // Either we're running without any rules or the FlowFile didn't match any + incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); + } + + // add the flowfile to the list to transfer + flowFilesToTransfer.add(incomingFlowFile); + } if (stateInitialAttributes != null) { try { @@ -779,8 +746,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final Map defaultActions = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { - if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE - && entry.getKey() != FAILURE_BEHAVIOR) { + if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { final Action action = new Action(); action.setAttribute(entry.getKey().getName()); action.setValue(entry.getValue()); diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 35b5536b51..50938e673c 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -1005,33 +1005,4 @@ public class TestUpdateAttribute { } } - @Test - public void testInvalidExpressionLanguage() { - final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setVariable("test", "Squirrel!!1!"); - runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}"); - runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE); - runner.assertValid(); - - runner.enqueue("Test"); - runner.run(); - - runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0); - runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1); - - runner.clearTransferState(); - - Throwable ex = null; - try { - runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP); - runner.enqueue("Test"); - runner.run(); - } catch (Throwable t) { - ex = t; - } finally { - Assert.assertNotNull(ex); - Assert.assertTrue(ex.getCause() instanceof ProcessException); - runner.assertQueueNotEmpty(); - } - } }