From 8acbe9aa3f72c4da3cc5d19f9827718b2ce8cc56 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Thu, 29 Dec 2016 13:45:47 -0500 Subject: [PATCH] NIFI-1582 Fixing how failures with state are handled and improving flowfile handling This closes #1371 Signed-off-by: Bryan Rosander --- .../attributes/UpdateAttribute.java | 128 +++++++++------ .../additionalDetails.html | 123 +++++++++----- .../attributes/TestUpdateAttribute.java | 151 +++++++++++++++++- 3 files changed, 311 insertions(+), 91 deletions(-) diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 4dee379f9f..699c8e2363 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,7 +45,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -86,10 +86,9 @@ import org.apache.nifi.update.attributes.serde.CriteriaSerDe; public class UpdateAttribute extends AbstractProcessor implements Searchable { - public static final String DO_NOT_STORE_STATE = "do not store state"; - public static final String STORE_STATE_LOCALLY = "store state locally"; + public static final String DO_NOT_STORE_STATE = "Do not store state"; + public static final String STORE_STATE_LOCALLY = "Store state locally"; - private boolean stateful = false; private final AtomicReference criteriaCache = new AtomicReference<>(null); private final ConcurrentMap propertyValues = new ConcurrentHashMap<>(); @@ -179,6 +178,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { private volatile Map defaultActions; private volatile boolean debugEnabled; + private volatile boolean stateful = false; public UpdateAttribute() { @@ -275,11 +275,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { debugEnabled = getLogger().isDebugEnabled(); } - @OnStopped - public void onStopped() { - defaultActions = null; - } - @Override protected Collection customValidate(final ValidationContext context) { final List reasons = new ArrayList<>(super.customValidate(context)); @@ -411,8 +406,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final ComponentLog logger = getLogger(); final Criteria criteria = criteriaCache.get(); - FlowFile flowFile = session.get(); - if (flowFile == null) { + FlowFile incomingFlowFile = session.get(); + if (incomingFlowFile == null) { return; } @@ -424,59 +419,102 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { // because is the original flowfile is used for all matching rules. in this // case the order of the matching rules is preserved in the list final Map> matchedRules = new HashMap<>(); - Map statefulAttributes = null; - matchedRules.clear(); + final Map stateInitialAttributes; + final Map stateWorkingAttributes; + StateMap stateMap = null; try { if (stateful) { - statefulAttributes = new HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap()); + stateMap = context.getStateManager().getState(Scope.LOCAL); + stateInitialAttributes = stateMap.toMap(); + stateWorkingAttributes = new HashMap<>(stateMap.toMap()); } else { - statefulAttributes = null; + stateInitialAttributes = null; + stateWorkingAttributes = null; } } catch (IOException e) { - logger.error("Failed to update attributes for {} due to failing to get state; transferring FlowFile back to '{}'", new Object[]{flowFile, Relationship.SELF.getName()}, e); - session.transfer(flowFile); + logger.error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{incomingFlowFile}, e); + session.transfer(incomingFlowFile); context.yield(); return; } Map defaultActions = this.defaultActions; + List flowFilesToTransfer = new LinkedList<>(); // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) { + if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { // apply the actions for each rule and transfer the flowfile for (final Map.Entry> entry : matchedRules.entrySet()) { FlowFile match = entry.getKey(); final List rules = entry.getValue(); + boolean updateWorking = incomingFlowFile.equals(match); // execute each matching rule(s) - try { - match = executeActions(session, context, rules, defaultActions, match, statefulAttributes); - logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); + match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); - // transfer the match - session.getProvenanceReporter().modifyAttributes(match); - session.transfer(match, REL_SUCCESS); - } catch (IOException e) { - logger.error("Failed to update attributes for {} due to a failure to set the state afterwards; transferring to '{}'", new Object[]{match, REL_FAILED_SET_STATE.getName()}, e); - session.transfer(match, REL_FAILED_SET_STATE); - return; + if (updateWorking) { + incomingFlowFile = match; } + + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); + } + + // add the match to the list to transfer + flowFilesToTransfer.add(match); } } else { - // transfer the flowfile to no match (that has the default actions applied) + // Either we're running without any rules or the FlowFile didn't match any + incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); + + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); + } + + // add the flowfile to the list to transfer + flowFilesToTransfer.add(incomingFlowFile); + } + + if (stateInitialAttributes != null) { try { - flowFile = executeActions(session, context, null, defaultActions, flowFile, statefulAttributes); - logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()}); - session.getProvenanceReporter().modifyAttributes(flowFile); - session.transfer(flowFile, REL_SUCCESS); + // Able to use "equals()" since we're just checking if the map was modified at all + if (!stateWorkingAttributes.equals(stateInitialAttributes)) { + + boolean setState = context.getStateManager().replace(stateMap, stateWorkingAttributes, Scope.LOCAL); + if (!setState) { + logger.warn("Failed to update the state after successfully processing {} due to having an old version of the StateMap. This is normally due to multiple threads running at " + + "once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()}); + + flowFilesToTransfer.remove(incomingFlowFile); + if (flowFilesToTransfer.size() > 0){ + session.remove(flowFilesToTransfer); + } + + session.transfer(incomingFlowFile, REL_FAILED_SET_STATE); + return; + } + } } catch (IOException e) { - logger.error("Failed to update attributes for {} due to failures setting state afterwards; transferring to '{}'", new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e); - session.transfer(flowFile, REL_FAILED_SET_STATE); + logger.error("Failed to set the state after successfully processing {} due a failure when setting the state. This is normally due to multiple threads running at " + + "once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()}, e); + + flowFilesToTransfer.remove(incomingFlowFile); + if (flowFilesToTransfer.size() > 0){ + session.remove(flowFilesToTransfer); + } + + session.transfer(incomingFlowFile, REL_FAILED_SET_STATE); + context.yield(); return; } } + + for(FlowFile toTransfer: flowFilesToTransfer) { + session.getProvenanceReporter().modifyAttributes(toTransfer); + } + session.transfer(flowFilesToTransfer, REL_SUCCESS); } //Evaluates the specified Criteria on the specified flowfile. Clones the @@ -548,7 +586,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { // Executes the specified action on the specified flowfile. private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List rules, final Map defaultActions, final FlowFile flowfile, - final Map statefulAttributes) throws IOException { + final Map stateInitialAttributes, final Map stateWorkingAttributes) { final ComponentLog logger = getLogger(); final Map actions = new HashMap<>(defaultActions); final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName(); @@ -579,14 +617,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final Map attributesToUpdate = new HashMap<>(actions.size()); final Set attributesToDelete = new HashSet<>(actions.size()); - final Map statefulAttributesToSet; - - if (statefulAttributes != null){ - statefulAttributesToSet = new HashMap<>(); - } else { - statefulAttributesToSet = null; - } - // go through each action boolean debugEnabled = this.debugEnabled; for (final Action action : actions.values()) { @@ -618,11 +648,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } } else { boolean notDeleted = !attributesToDelete.contains(attribute); - boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule"); + boolean setStatefulAttribute = stateInitialAttributes != null && !attribute.equals("UpdateAttribute.matchedRule"); if (notDeleted || setStatefulAttribute) { try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue(); + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue(); // log if appropriate if (debugEnabled) { @@ -630,7 +660,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } if (setStatefulAttribute) { - statefulAttributesToSet.put(attribute, newAttributeValue); + stateWorkingAttributes.put(attribute, newAttributeValue); } // No point in updating if it will be removed @@ -669,10 +699,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete); } - if(statefulAttributesToSet != null) { - context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL); - } - return returnFlowfile; } diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index e2a9d83145..75428a6402 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -34,10 +34,39 @@

Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it - was updated or not. The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression" + was updated or not. That said, the "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression" will not detect it.

+

+ Properties: +

+

+ The properties in this processor are added by the user. The expression language is supported in user-added + properties for this processor. See the NiFi Expression Language Guide to learn how to formulate proper expression language statements to perform the desired functions. +

+ +

+ If an Attribute is added with the name alternate.identifier and that attribute's value is a URI, an ADD_INFO Provenance Event will be registered, + correlating the FlowFile with the given alternate identifier. +

+ +

+ Relationships: +

+
    +
  • success +
      +
    • If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.
    • +
    +
  • +
  • set state fail +
      +
    • If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.
    • +
    +
  • +
+

Basic Usage

@@ -261,6 +290,7 @@ By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so: +

  • Dynamic Property @@ -271,6 +301,7 @@
+

This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}". @@ -278,34 +309,34 @@ The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value. - If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most recent values of count and sum: -

    -
  • Count -
      -
    • key : theCount
    • -
    • value : ${getStateValue("theCount"):plus(1)}
    • -
    -
  • +

    +
      +
    • Count +
        +
      • key : theCount
      • +
      • value : ${getStateValue("theCount"):plus(1)}
      • +
      +
    • -
    • Sum -
        -
      • key : theSum
      • -
      • value : ${getStateValue("theSum"):plus(${flowfileValue})}
      • -
      -
    • - -
    • Average -
        -
      • key : theAverage
      • -
      • value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}
      • -
      -
    • -
    +
  • Sum +
      +
    • key : theSum
    • +
    • value : ${getStateValue("theSum"):plus(${flowfileValue})}
    • +
    +
  • +
  • Average +
      +
    • key : theAverage
    • +
    • value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}
    • +
    +
  • +
+

Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly calculates the average. @@ -319,33 +350,47 @@

- Properties: -

-

- The properties in this processor are added by the user. The expression language is supported in user-added - properties for this processor. See the NiFi Expression Language Guide to learn how to formulate proper expression language statements to perform the desired functions. + Combining the Advanced Usage with Stateful

- If an Attribute is added with the name alternate.identifier and that attribute's value is a URI, an ADD_INFO Provenance Event will be registered, - correlating the FlowFile with the given alternate identifier. + The UpdateAttribute processor allows you to use both advanced usage changes (i.e., conditional) in addition to storing the values in state at the same time. This allows UpdateAttribute to + act as a stateful rules engine to enable powerful concepts such as a Finite-State machine or keeping track of a min/max value. + + Working with both is relatively simple, when the processor would normally update an attribute on the processor (ie. it matches a conditional rule) the same update is stored to state. + Referencing state via the advanced tab is done in the same way too, using "getStateValue". + + Note: In the event the "use clone" policy is set and the state is failed to set, no clones will be generated and only the original FlowFile will be transferred to "set state fail".

- Relationships: + Notes about Concurrency and Stateful Usage

+

+ When using the stateful option, concurrent tasks should be used with caution. If every incoming FlowFile will update state then it will be much more efficient to have only one + task. This is because the first thing the onTrigger does is get the state and the last thing it does is store the state if there are an updates. If it does not have the most up to date + initial state when it goes to update it will fail and send the FlowFile to "set state fail". This is done so that the update is successful when it was done with the most recent information. + If it didn't do it in this mock-atomic way, there'd be no guarantee that the state is accurate. + + When considering Concurrency, the use-cases generally fall into one of three categories:

    -
  • success -
      -
    • If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.
    • -
    +
  • A data stream where each FlowFile updates state ex. updating a counter
  • -
  • set state fail -
      -
    • If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.
    • -
    + +
  • A data stream where a FlowFile doesn't always update state ex. a Finite-State machine +
  • + +
  • A data stream that doesn't update state, and a second "control" stream that one updates every time but is rare compared to the data stream ex. a trigger
+

+ The first and last cases are relatively clear-cut in their guidance. For the first, concurrency should not be used. Doing so will just waste CPU and any benefits of concurrency will be wiped + due to misses in state. For the last case, it can easily be done using concurrency. Since updates are rare in the first place it will be even more rare that two updates are processed at + the same time that cause problems. + + The second case is a bit of a grey area. If updates are rare then concurrency can probably be used. If updates are frequent then concurrency would probably cause more problems than benefits. + Regardless, testing to determine the appropriate tuning is the only true answer. +

diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 1812a509f3..5c00a20b7f 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -19,6 +19,7 @@ package org.apache.nifi.update.attributes; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -26,7 +27,10 @@ import java.util.Map; import java.util.UUID; import java.util.regex.PatternSyntaxException; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processors.attributes.UpdateAttribute; +import org.apache.nifi.state.MockStateManager; import org.apache.nifi.update.attributes.serde.CriteriaSerDe; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -152,6 +156,43 @@ public class TestUpdateAttribute { runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("sum", "15"); } + @Test + public void testStateFailures() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor(); + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + MockStateManager mockStateManager = runner.getStateManager(); + + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty("count", "${getStateValue('count'):plus(1)}"); + runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}"); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); + + processor.onScheduled(runner.getProcessContext()); + + final Map attributes2 = new HashMap<>(); + attributes2.put("pencils", "2"); + + mockStateManager.setFailOnStateGet(Scope.LOCAL, true); + + runner.enqueue(new byte[0],attributes2); + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueNotEmpty(); + + mockStateManager.setFailOnStateGet(Scope.LOCAL, false); + mockStateManager.setFailOnStateSet(Scope.LOCAL, true); + + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueEmpty(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("count", "1"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("sum", "2"); + } + + @Test public void testStateWithInitValue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); @@ -218,6 +259,99 @@ public class TestUpdateAttribute { result.get(3).assertAttributeEquals("maxValue", null); } + @Test + public void testStateFailuresWithRulesUsingOriginal() throws Exception { + final Criteria criteria = getCriteria(); + criteria.setFlowFilePolicy(FlowFilePolicy.USE_ORIGINAL); + addRule(criteria, "rule", Collections.singletonList( + // conditions + "${getStateValue('maxValue'):lt(${value})}"), getMap( + // actions + "maxValue", "${value}")); + addRule(criteria, "rule2", Collections.singletonList( + // conditions + "${getStateValue('maxValue2'):lt(${value})}"), getMap( + // actions + "maxValue2", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor(); + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + MockStateManager mockStateManager = runner.getStateManager(); + + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); + runner.setAnnotationData(serialize(criteria)); + + + processor.onScheduled(runner.getProcessContext()); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + + mockStateManager.setFailOnStateGet(Scope.LOCAL, true); + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueNotEmpty(); + mockStateManager.setFailOnStateGet(Scope.LOCAL, false); + mockStateManager.setFailOnStateSet(Scope.LOCAL, true); + + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueEmpty(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue", "1"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue2", "1"); + } + + @Test + public void testStateFailuresWithRulesUsingClone() throws Exception { + final Criteria criteria = getCriteria(); + criteria.setFlowFilePolicy(FlowFilePolicy.USE_CLONE); + addRule(criteria, "rule", Collections.singletonList( + // conditions + "${getStateValue('maxValue'):lt(${value})}"), getMap( + // actions + "maxValue", "${value}")); + addRule(criteria, "rule2", Collections.singletonList( + // conditions + "${getStateValue('maxValue2'):lt(${value})}"), getMap( + // actions + "maxValue2", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor(); + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + MockStateManager mockStateManager = runner.getStateManager(); + + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); + runner.setAnnotationData(serialize(criteria)); + + + processor.onScheduled(runner.getProcessContext()); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + + mockStateManager.setFailOnStateGet(Scope.LOCAL, true); + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueNotEmpty(); + mockStateManager.setFailOnStateGet(Scope.LOCAL, false); + mockStateManager.setFailOnStateSet(Scope.LOCAL, true); + + processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession()); + + runner.assertQueueEmpty(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue", "1"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeNotExists("maxValue2"); + } @Test public void testRuleHitWithStateWithDefault() throws Exception { final Criteria criteria = getCriteria(); @@ -289,11 +423,17 @@ public class TestUpdateAttribute { @Test public void testMultipleRulesWithStateAndDelete() throws Exception { final Criteria criteria = getCriteria(); - addRule(criteria, "rule", Arrays.asList( + criteria.setFlowFilePolicy(FlowFilePolicy.USE_ORIGINAL); + addRule(criteria, "rule", Collections.singletonList( // conditions "${getStateValue('maxValue'):lt(${value})}"), getMap( // actions "maxValue", "${value}")); + addRule(criteria, "rule2", Collections.singletonList( + // conditions + "${value:mod(2):equals(0)}"), getMap( + // actions + "whatIsIt", "even")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); @@ -301,6 +441,8 @@ public class TestUpdateAttribute { runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); runner.setAnnotationData(serialize(criteria)); runner.setProperty("maxValue", "${getStateValue('maxValue')}"); + runner.setProperty("whatIsIt", "odd"); + runner.setProperty("whatWasIt", "${getStateValue('whatIsIt')}"); runner.setProperty("theCount", "${getStateValue('theCount'):plus(1)}"); final Map attributes = new HashMap<>(); @@ -322,7 +464,14 @@ public class TestUpdateAttribute { final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); result.get(3).assertAttributeEquals("maxValue", "5"); result.get(3).assertAttributeEquals("theCount", "4"); + result.get(0).assertAttributeEquals("badValue", null); + + result.get(0).assertAttributeEquals("whatIsIt", "odd"); + result.get(1).assertAttributeEquals("whatIsIt", "even"); + + result.get(2).assertAttributeEquals("whatWasIt", "even"); + result.get(3).assertAttributeEquals("whatWasIt", "odd"); } @Test