diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
index 666ada5539..efe76ee591 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
@@ -257,6 +257,29 @@ public interface PropertyValue {
*/
PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
+
+ /**
+ *
+ * Replaces values in the Property Value using the NiFi Expression
+ * Language; a PropertyValue with the new value is then returned, supporting
+ * call chaining.
+ *
+ *
+ * @param flowFile to evaluate attributes of
+ * @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
+ * this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
+ * @param decorator the decorator to use in order to update the values returned by the Expression Language
+ * @param stateValues a Map of the state values to be referenced explicitly by specific state accessing functions
+ *
+ * @return a PropertyValue with the new value is returned, supporting call
+ * chaining
+ *
+ * @throws ProcessException if the Expression cannot be compiled or evaluating
+ * the Expression against the given attributes causes an Exception to be thrown
+ */
+ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues)
+ throws ProcessException;
+
/**
*
* Replaces values in the Property Value using the NiFi Expression Language.
diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g
index f09eba8e37..071fda9a2a 100644
--- a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g
+++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g
@@ -137,6 +137,7 @@ UNESCAPE_HTML3 : 'unescapeHtml3';
UNESCAPE_HTML4 : 'unescapeHtml4';
BASE64_ENCODE : 'base64Encode';
BASE64_DECODE : 'base64Decode';
+GET_STATE_VALUE: 'getStateValue';
// 1 arg functions
SUBSTRING_AFTER : 'substringAfter';
diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g
index eb50a280f9..11cbec866a 100644
--- a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g
+++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g
@@ -129,7 +129,7 @@ functionCall : functionRef ->
booleanLiteral : TRUE | FALSE;
zeroArgStandaloneFunction : (IP | UUID | NOW | NEXT_INT | HOSTNAME | RANDOM) LPAREN! RPAREN!;
-oneArgStandaloneFunction : ((TO_LITERAL | MATH)^ LPAREN! anyArg RPAREN!) |
+oneArgStandaloneFunction : ((TO_LITERAL | MATH | GET_STATE_VALUE)^ LPAREN! anyArg RPAREN!) |
(HOSTNAME^ LPAREN! booleanLiteral RPAREN!);
standaloneFunction : zeroArgStandaloneFunction | oneArgStandaloneFunction;
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java
new file mode 100644
index 0000000000..b4ec5b2e22
--- /dev/null
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.attribute.expression.language;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/*
+ *This class is passed to Evaluators so that certain evaluators that specifically work with state will have access to the state values explicitly.
+ *It implements Map so that other evaluators don't have to be changed.
+ */
+public class AttributesAndState implements Map {
+
+ private final Map stateMap;
+ private final Map attributes;
+
+ public AttributesAndState(Map attributes, Map state) {
+ super();
+ stateMap = state;
+ this.attributes = attributes;
+ }
+
+ public Map getStateMap() {
+ return stateMap;
+ }
+
+ @Override
+ public int size() {
+ return attributes.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return attributes.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return attributes.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return attributes.containsValue(value);
+ }
+
+ @Override
+ public String get(Object key) {
+ return attributes.get(key);
+ }
+
+ @Override
+ public String put(String key, String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map extends String, ? extends String> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set keySet() {
+ return attributes.keySet();
+ }
+
+ @Override
+ public Collection values() {
+ return attributes.values();
+ }
+
+ @Override
+ public Set> entrySet() {
+ return attributes.entrySet();
+ }
+}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
index 5ed00ed74e..403753163f 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
@@ -33,4 +33,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
public String evaluateExpressions(Map valueLookup, AttributeValueDecorator decorator) throws ProcessException {
return value;
}
+
+ @Override
+ public String evaluateExpressions(Map attributes, AttributeValueDecorator decorator, Map stateVariables) throws ProcessException {
+ return value;
+ }
}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
index cbf6c663b5..1033c713eb 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
@@ -43,5 +43,8 @@ public class InvalidPreparedQuery implements PreparedQuery {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
-
+ @Override
+ public String evaluateExpressions( Map valueLookup, AttributeValueDecorator decorator, Map stateVariables) throws ProcessException {
+ throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
+ }
}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
index e1a1db7bdd..53f7296234 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
@@ -25,4 +25,5 @@ public interface PreparedQuery {
String evaluateExpressions(Map valueLookup, AttributeValueDecorator decorator) throws ProcessException;
+ String evaluateExpressions(final Map valueLookup, final AttributeValueDecorator decorator, final Map stateVariables) throws ProcessException;
}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java
index fb48b0ffe3..06caf5361f 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java
@@ -49,6 +49,7 @@ import org.apache.nifi.attribute.expression.language.evaluation.functions.FindEv
import org.apache.nifi.attribute.expression.language.evaluation.functions.FormatEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.FromRadixEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GetDelimitedFieldEvaluator;
+import org.apache.nifi.attribute.expression.language.evaluation.functions.GetStateVariableEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanOrEqualEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.HostnameEvaluator;
@@ -149,6 +150,7 @@ import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpre
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FIND;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FORMAT;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_DELIMITED_FIELD;
+import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_STATE_VALUE;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN_OR_EQUAL;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.HOSTNAME;
@@ -384,8 +386,9 @@ public class Query {
return -1;
}
- static String evaluateExpression(final Tree tree, final String queryText, final Map valueMap, final AttributeValueDecorator decorator) throws ProcessException {
- final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap).getValue();
+ static String evaluateExpression(final Tree tree, final String queryText, final Map valueMap, final AttributeValueDecorator decorator,
+ final Map stateVariables) throws ProcessException {
+ final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap, stateVariables).getValue();
if (evaluated == null) {
return null;
}
@@ -395,6 +398,11 @@ public class Query {
return decorator == null ? escaped : decorator.decorate(escaped);
}
+ static String evaluateExpressions(final String rawValue, Map expressionMap, final AttributeValueDecorator decorator, final Map stateVariables)
+ throws ProcessException {
+ return Query.prepare(rawValue).evaluateExpressions(expressionMap, decorator, stateVariables);
+ }
+
static String evaluateExpressions(final String rawValue, final Map valueLookup) throws ProcessException {
return evaluateExpressions(rawValue, valueLookup, null);
}
@@ -563,13 +571,22 @@ public class Query {
}
QueryResult> evaluate(final Map map) {
+ return evaluate(map, null);
+ }
+
+ QueryResult> evaluate(final Map attributes, final Map stateMap) {
if (evaluated.getAndSet(true)) {
throw new IllegalStateException("A Query cannot be evaluated more than once");
}
-
- return evaluator.evaluate(map);
+ if (stateMap != null) {
+ AttributesAndState attributesAndState = new AttributesAndState(attributes, stateMap);
+ return evaluator.evaluate(attributesAndState);
+ } else {
+ return evaluator.evaluate(attributes);
+ }
}
+
Tree getTree() {
return this.tree;
}
@@ -747,6 +764,12 @@ public class Query {
throw new AttributeExpressionLanguageParsingException("Call to math() as the subject must take exactly 1 parameter");
}
}
+ case GET_STATE_VALUE: {
+ final Tree childTree = tree.getChild(0);
+ final Evaluator> argEvaluator = buildEvaluator(childTree);
+ final Evaluator stringEvaluator = toStringEvaluator(argEvaluator);
+ return new GetStateVariableEvaluator(stringEvaluator);
+ }
default:
throw new AttributeExpressionLanguageParsingException("Unexpected token: " + tree.toString());
}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
index 7473b3b11f..39cfb25f9f 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
@@ -38,14 +38,14 @@ public class StandardPreparedQuery implements PreparedQuery {
@Override
- public String evaluateExpressions(final Map valueMap, final AttributeValueDecorator decorator) throws ProcessException {
+ public String evaluateExpressions(final Map valMap, final AttributeValueDecorator decorator, final Map stateVariables) throws ProcessException {
final StringBuilder sb = new StringBuilder();
for (final String val : queryStrings) {
final Tree tree = trees.get(val);
if (tree == null) {
sb.append(val);
} else {
- final String evaluated = Query.evaluateExpression(tree, val, valueMap, decorator);
+ final String evaluated = Query.evaluateExpression(tree, val, valMap, decorator, stateVariables);
if (evaluated != null) {
sb.append(evaluated);
}
@@ -54,4 +54,9 @@ public class StandardPreparedQuery implements PreparedQuery {
return sb.toString();
}
+ @Override
+ public String evaluateExpressions(final Map valMap, final AttributeValueDecorator decorator)
+ throws ProcessException {
+ return evaluateExpressions(valMap, decorator, null);
+ }
}
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
index d70b2d86ac..94c1c5021e 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
@@ -145,11 +145,17 @@ public class StandardPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
+ return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null);
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues)
+ throws ProcessException {
if (rawValue == null || preparedQuery == null) {
return this;
}
final ValueLookup lookup = new ValueLookup(variableRegistry, flowFile, additionalAttributes);
- return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator), serviceLookup, null, variableRegistry);
+ return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator, stateValues), serviceLookup, null);
}
@Override
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java
new file mode 100644
index 0000000000..8808e17eb6
--- /dev/null
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.attribute.expression.language.evaluation.functions;
+
+import org.apache.nifi.attribute.expression.language.AttributesAndState;
+import org.apache.nifi.attribute.expression.language.evaluation.Evaluator;
+import org.apache.nifi.attribute.expression.language.evaluation.QueryResult;
+import org.apache.nifi.attribute.expression.language.evaluation.StringEvaluator;
+import org.apache.nifi.attribute.expression.language.evaluation.StringQueryResult;
+
+import java.util.Map;
+
+public class GetStateVariableEvaluator extends StringEvaluator {
+
+ private final Evaluator subject;
+
+ public GetStateVariableEvaluator(final Evaluator subject) {
+ this.subject = subject;
+ }
+
+ @Override
+ public QueryResult evaluate(Map attributes) {
+ if (!(attributes instanceof AttributesAndState)){
+ return new StringQueryResult(null);
+ }
+
+ final String subjectValue = subject.evaluate(attributes).getValue();
+ if (subjectValue == null) {
+ return new StringQueryResult(null);
+ }
+
+ AttributesAndState attributesAndState = (AttributesAndState) attributes;
+
+ Map stateMap = attributesAndState.getStateMap();
+ String stateValue = stateMap.get(subjectValue);
+
+ return new StringQueryResult(stateValue);
+ }
+
+ @Override
+ public Evaluator getSubjectEvaluator() {
+ return subject;
+ }
+}
diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
index f47fbbb759..b666b261d3 100644
--- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
+++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java
@@ -72,6 +72,7 @@ public class TestQuery {
assertValid("${hostname()}");
assertValid("${literal(3)}");
assertValid("${random()}");
+ assertValid("${getStateValue('the_count')}");
// left here because it's convenient for looking at the output
//System.out.println(Query.compile("").evaluate(null));
}
@@ -1488,6 +1489,32 @@ public class TestQuery {
assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes));
}
+ @Test
+ public void testGetStateValue() {
+ final Map stateValues = new HashMap<>();
+ stateValues.put("abc", "xyz");
+ stateValues.put("123", "qwe");
+ stateValues.put("true", "asd");
+ stateValues.put("iop", "098");
+
+ final Map attributes = new HashMap<>();
+ attributes.put("abc", "iop");
+ attributes.put("4321", "123");
+ attributes.put("false", "bnm");
+
+ String query = "${getStateValue('abc')}";
+ verifyEquals(query, attributes, stateValues, "xyz");
+
+ query = "${getStateValue(${'4321':toString()})}";
+ verifyEquals(query, attributes, stateValues, "qwe");
+
+ query = "${getStateValue(${literal(true):toString()})}";
+ verifyEquals(query, attributes, stateValues, "asd");
+
+ query = "${getStateValue(${abc}):equals('098')}";
+ verifyEquals(query, attributes, stateValues, true);
+ }
+
@Test
public void testLiteralFunction() {
final Map attrs = Collections.emptyMap();
@@ -1658,11 +1685,15 @@ public class TestQuery {
}
private void verifyEquals(final String expression, final Map attributes, final Object expectedResult) {
+ verifyEquals(expression,attributes, null, expectedResult);
+ }
+
+ private void verifyEquals(final String expression, final Map attributes, final Map stateValues, final Object expectedResult) {
Query.validateExpression(expression, false);
- assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null));
+ assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null, stateValues));
final Query query = Query.compile(expression);
- final QueryResult> result = query.evaluate(attributes);
+ final QueryResult> result = query.evaluate(attributes, stateValues);
if (expectedResult instanceof Long) {
if (ResultType.NUMBER.equals(result.getResultType())) {
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index a577bc8118..772aa8e97c 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -23,6 +23,8 @@ import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Instant;
+import java.text.NumberFormat;
+import java.text.ParseException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -126,6 +128,24 @@ public class StandardValidators {
}
};
+ public static final Validator NUMBER_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ NumberFormat.getInstance().parse(value);
+ } catch (ParseException e) {
+ reason = "not a valid Number";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true);
/**
diff --git a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
index c5e2a77ff6..7af3be73f9 100644
--- a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
@@ -1974,6 +1974,22 @@ an error when validating the function.
`${literal( ${allMatchingAttributes('a.*'):count()} ):gt(3)}` returns true if there are more than 3 attributes whose
names begin with the letter `a`.
+[.function]
+=== getStateValue
+
+*Description*: [.description]#Access a processor's state values by passing in the String key and getting the value back as a String. This
+ is a special Expression Language function that only works with processors that explicitly allow EL to query state. Currently only UpdateAttribute
+ does.#
+
+*Subject Type*: [.subjectless]#No Subject#
+
+*Arguments*:
+
+ - [.String]#_Key_# : [.argDesc]#The key to use when accessing the state map.#
+
+*Return Type*: [.returnType]#String#
+
+*Examples*: UpdateAttribute processor has stored the key "count" with value "20" in state. '${getStateValue("count")}` returns `20`.
[[multi]]
== Evaluating Multiple Attributes
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
index 86c6ee7f9f..b6752a7588 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
@@ -171,12 +171,18 @@ public class MockPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
+ return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null);
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues)
+ throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
- final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator);
+ final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, stateValues);
return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true, variableRegistry);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 169cdee0a9..7c4ce77d32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -160,6 +160,12 @@ public class ConnectableProcessContext implements ProcessContext {
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues)
+ throws ProcessException {
+ return null;
+ }
};
}
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 08f4ee9ebc..22e558bd25 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.attributes;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -37,6 +38,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -46,6 +49,9 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -66,71 +72,49 @@ import org.apache.nifi.update.attributes.FlowFilePolicy;
import org.apache.nifi.update.attributes.Rule;
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
-/**
- * This processor supports updating flowfile attributes and can do so
- * conditionally or unconditionally. It can also delete flowfile attributes
- * that match a regular expression.
- *
- * Like the FlowFileMetadataEnhancer, it can
- * be configured with an arbitrary number of optional properties to define how
- * attributes should be updated. Each optional property represents an action
- * that is applied to all incoming flow files. An action is comprised of an
- * attribute key and a format string. The format string supports the following
- * parameters.
- *
- * - %1 - is the random generated UUID.
- * - %2 - is the current calendar time.
- * - ${"attribute.key") - is the flow file attribute value of the key
- * contained within the brackets.
- *
- *
- * When creating the optional properties, enter the attribute key as the
- * property name and the desired format string as the value. The optional
- * properties are considered default actions and are applied unconditionally.
- *
- * In addition to the default actions, this processor has a user interface (UI)
- * where conditional actions can be specified. In the UI, rules can be created.
- * Rules are comprised of an arbitrary number of conditions and actions. In
- * order for a rule to be activated, all conditions must evaluate to true.
- *
- * A rule condition is comprised of an attribute key and a regular expression. A
- * condition evaluates to true when the flowfile contains the attribute
- * specified and it's value matches the specified regular expression.
- *
- * A rule action follows the same definition as a rule above. It includes an
- * attribute key and a format string. The format string supports the same
- * parameters defined above.
- *
- * When a rule is activated (because conditions evaluate to true), all actions
- * in that rule are executed. Once each action has been applied, any remaining
- * default actions will be applied. This means that if rule action and a default
- * action modify the same attribute, only the rule action will execute. Default
- * actions will only execute when the attribute in question is not modified as
- * part of an activated rule.
- *
- * The incoming flow file is cloned for each rule that is activated. If no rule
- * is activated, any default actions are applied to the original flowfile and it
- * is transferred.
- *
- * This processor only supports a SUCCESS relationship.
- *
- * Note: In order for configuration changes made in the custom UI to take
- * effect, the processor must be stopped and started.
- */
@EventDriven
@SideEffectFree
+@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language"})
+@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language", "state"})
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression")
@DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true,
description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details")
+@Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.")
public class UpdateAttribute extends AbstractProcessor implements Searchable {
+
+ public static final String DO_NOT_STORE_STATE = "do not store state";
+ public static final String STORE_STATE_LOCALLY = "store state locally";
+
+ private boolean stateful = false;
private final AtomicReference criteriaCache = new AtomicReference<>(null);
private final ConcurrentMap propertyValues = new ConcurrentHashMap<>();
- private final Set relationships;
+ private final static Set statelessRelationshipSet;
+ private final static Set statefulRelationshipSet;
+
+ // relationships
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .description("All successful FlowFiles are routed to this relationship").name("success").build();
+ public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder()
+ .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build();
+
+ static {
+ Set tempStatelessSet = new HashSet<>();
+ tempStatelessSet.add(REL_SUCCESS);
+
+ statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet);
+
+ Set tempStatefulSet = new HashSet<>();
+ tempStatefulSet.add(REL_SUCCESS);
+ tempStatefulSet.add(REL_FAILED_SET_STATE);
+
+ statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet);
+ }
+
+ private volatile Set relationships;
private static final Validator DELETE_PROPERTY_VALIDATOR = new Validator() {
private final Validator DPV_RE_VALIDATOR = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
@@ -162,20 +146,32 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// static properties
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Delete Attributes Expression")
- .description("Regular expression for attributes to be deleted from flowfiles.")
+ .description("Regular expression for attributes to be deleted from FlowFiles.")
.required(false)
.addValidator(DELETE_PROPERTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
- // relationships
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .description("All FlowFiles are routed to this relationship").name("success").build();
+ public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder()
+ .name("Store State")
+ .description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " +
+ "FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " +
+ "state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information")
+ .required(true)
+ .allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY)
+ .defaultValue(DO_NOT_STORE_STATE)
+ .build();
+ public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder()
+ .name("Stateful Variables Initial Value")
+ .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " +
+ "when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
public UpdateAttribute() {
- final Set relationshipSet = new HashSet<>();
- relationshipSet.add(REL_SUCCESS);
- relationships = Collections.unmodifiableSet(relationshipSet);
+ relationships = statelessRelationshipSet;
}
@Override
@@ -187,30 +183,96 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
protected List getSupportedPropertyDescriptors() {
List descriptors = new ArrayList<>();
descriptors.add(DELETE_ATTRIBUTES);
+ descriptors.add(STORE_STATE);
+ descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
return Collections.unmodifiableList(descriptors);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
+ PropertyDescriptor.Builder propertyBuilder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
- .dynamic(true)
- .build();
+ .dynamic(true);
+
+ if (stateful) {
+ return propertyBuilder
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .build();
+ } else {
+ return propertyBuilder
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ }
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+
+ if (descriptor.equals(STORE_STATE)) {
+ if (DO_NOT_STORE_STATE.equals(newValue)){
+ stateful = false;
+ relationships = statelessRelationshipSet;
+ } else {
+ stateful = true;
+ relationships = statefulRelationshipSet;
+ }
+ }
}
@OnScheduled
- public void clearPropertyValueMap() {
+ public void onScheduled(final ProcessContext context) throws IOException {
+ criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData()));
+
propertyValues.clear();
+
+ if(stateful) {
+ StateManager stateManager = context.getStateManager();
+ StateMap state = stateManager.getState(Scope.LOCAL);
+ HashMap tempMap = new HashMap<>();
+ tempMap.putAll(state.toMap());
+ String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue();
+
+ // Initialize the stateful default actions
+ for (PropertyDescriptor entry : context.getProperties().keySet()) {
+ if (entry.isDynamic()) {
+ if(!tempMap.containsKey(entry.getName())) {
+ tempMap.put(entry.getName(), initValue);
+ }
+ }
+ }
+
+ // Initialize the stateful actions if the criteria exists
+ final Criteria criteria = criteriaCache.get();
+ if (criteria != null) {
+ for (Rule rule : criteria.getRules()) {
+ for (Action action : rule.getActions()) {
+ if (!tempMap.containsKey(action.getAttribute())) {
+ tempMap.put(action.getAttribute(), initValue);
+ }
+ }
+ }
+ }
+
+ context.getStateManager().setState(tempMap, Scope.LOCAL);
+ }
}
@Override
protected Collection customValidate(final ValidationContext context) {
final List reasons = new ArrayList<>(super.customValidate(context));
+ if (!context.getProperty(STORE_STATE).getValue().equals(DO_NOT_STORE_STATE)){
+ String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue();
+ if (initValue == null){
+ reasons.add(new ValidationResult.Builder().subject(STATEFUL_VARIABLES_INIT_VALUE.getDisplayName()).valid(false)
+ .explanation("initial state value much be set if the processor is configured to store state.").build());
+ }
+ }
+
Criteria criteria = null;
try {
criteria = CriteriaSerDe.deserialize(context.getAnnotationData());
@@ -325,20 +387,12 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
}
- @OnScheduled
- public void parseAnnotationData(final ProcessContext context) {
- criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData()));
- }
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ComponentLog logger = getLogger();
final Criteria criteria = criteriaCache.get();
- List flowFiles = session.get(100);
- if (flowFiles.isEmpty()) {
- return;
- }
+ FlowFile flowFile = session.get();
final Map properties = context.getProperties();
@@ -353,45 +407,70 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// because is the original flowfile is used for all matching rules. in this
// case the order of the matching rules is preserved in the list
final Map> matchedRules = new HashMap<>();
+ Map statefulAttributes = null;
- for (FlowFile flowFile : flowFiles) {
- matchedRules.clear();
+ matchedRules.clear();
- // if there is update criteria specified, evaluate it
- if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules)) {
- // apply the actions for each rule and transfer the flowfile
- for (final Map.Entry> entry : matchedRules.entrySet()) {
- FlowFile match = entry.getKey();
- final List rules = entry.getValue();
+ try {
+ if (stateful) {
+ statefulAttributes = new HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap());
+ } else {
+ statefulAttributes = null;
+ }
+ } catch (IOException e) {
+ logger.error("Failed to update attributes for {} due to failing to get state; transferring FlowFile back to '{}'", new Object[]{flowFile, Relationship.SELF.getName()}, e);
+ session.transfer(flowFile);
+ context.yield();
+ return;
+ }
- // execute each matching rule(s)
- match = executeActions(session, context, rules, defaultActions, match);
+ // if there is update criteria specified, evaluate it
+ if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
+ // apply the actions for each rule and transfer the flowfile
+ for (final Map.Entry> entry : matchedRules.entrySet()) {
+ FlowFile match = entry.getKey();
+ final List rules = entry.getValue();
+
+ // execute each matching rule(s)
+ try {
+ match = executeActions(session, context, rules, defaultActions, match, statefulAttributes);
logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
// transfer the match
session.getProvenanceReporter().modifyAttributes(match);
session.transfer(match, REL_SUCCESS);
+ } catch (IOException e) {
+ logger.error("Failed to update attributes for {} due to a failure to set the state afterwards; transferring to '{}'", new Object[]{match, REL_FAILED_SET_STATE.getName()}, e);
+ session.transfer(match, REL_FAILED_SET_STATE);
+ return;
}
- } else {
- // transfer the flowfile to no match (that has the default actions applied)
- flowFile = executeActions(session, context, null, defaultActions, flowFile);
+ }
+ } else {
+ // transfer the flowfile to no match (that has the default actions applied)
+ try {
+ flowFile = executeActions(session, context, null, defaultActions, flowFile, statefulAttributes);
logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
+ } catch (IOException e) {
+ logger.error("Failed to update attributes for {} due to failures setting state afterwards; transferring to '{}'", new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e);
+ session.transfer(flowFile, REL_FAILED_SET_STATE);
+ return;
}
}
}
//Evaluates the specified Criteria on the specified flowfile. Clones the
// specified flow file for each rule that is applied.
- private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map> matchedRules) {
- final ComponentLog logger = getLogger();
+ private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map> matchedRules, final Map statefulAttributes) {
+ final ComponentLog logger = getLogger();
final List rules = criteria.getRules();
// consider each rule and hold a copy of the flowfile for each matched rule
for (final Rule rule : rules) {
// evaluate the rule
- if (evaluateRule(context, rule, flowfile)) {
+ if (evaluateRule(context, rule, flowfile, statefulAttributes)) {
final FlowFile flowfileToUse;
// determine if we should use the original flow file or clone
@@ -421,12 +500,12 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
//Evaluates the specified rule on the specified flowfile.
- private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) {
+ private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile, final Map statefulAttributes) {
// go through each condition
for (final Condition condition : rule.getConditions()) {
// fail if any condition is not met
- if (!evaluateCondition(context, condition, flowfile)) {
+ if (!evaluateCondition(context, condition, flowfile, statefulAttributes)) {
return false;
}
}
@@ -447,19 +526,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return currentValue;
}
- //Evaluates the specified condition on the specified flowfile.
- private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) {
+ // Evaluates the specified condition on the specified flowfile.
+ private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map statefulAttributes) {
try {
// evaluate the expression for the given flow file
- return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile).asBoolean();
+ return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean();
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe);
}
}
// Executes the specified action on the specified flowfile.
- private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List rules, final Map defaultActions, final FlowFile flowfile) {
- final ComponentLog logger = getLogger();
+ private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List rules, final Map defaultActions, final FlowFile flowfile,
+ final Map statefulAttributes) throws IOException {
+ final ComponentLog logger = getLogger();
final Map actions = new HashMap<>(defaultActions);
final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName();
@@ -489,17 +569,32 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
final Map attributesToUpdate = new HashMap<>(actions.size());
final Set attributesToDelete = new HashSet<>(actions.size());
+ final Map statefulAttributesToSet;
+
+ if (statefulAttributes != null){
+ statefulAttributesToSet = new HashMap<>();
+ } else {
+ statefulAttributesToSet = null;
+ }
+
+
// go through each action
for (final Action action : actions.values()) {
if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
try {
- final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue();
+ final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
}
+ if (statefulAttributesToSet != null) {
+ if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) {
+ statefulAttributesToSet.put(action.getAttribute(), newAttributeValue);
+ }
+ }
+
attributesToUpdate.put(action.getAttribute(), newAttributeValue);
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
@@ -545,8 +640,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
}
- // update and delete the flowfile attributes
- return session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
+ // update and delete the FlowFile attributes
+ FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
+
+ if(statefulAttributesToSet != null) {
+ context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);
+ }
+
+ return returnFlowfile;
}
// Gets the default actions.
@@ -554,10 +655,12 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
final Map defaultActions = new HashMap<>();
for (final Map.Entry entry : properties.entrySet()) {
- final Action action = new Action();
- action.setAttribute(entry.getKey().getName());
- action.setValue(entry.getValue());
- defaultActions.put(action.getAttribute(), action);
+ if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) {
+ final Action action = new Action();
+ action.setAttribute(entry.getKey().getName());
+ action.setValue(entry.getValue());
+ defaultActions.put(action.getAttribute(), action);
+ }
}
return defaultActions;
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
index cd4d34fca8..8a60c8fb87 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
@@ -247,6 +247,71 @@
Once all changes have been saved in the Advanced UI, the UI can be closed using the X in the top right corner.
+
+ Stateful Usage
+
+
+
+ By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but
+ also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be
+ referenced as a stateful variable like so:
+
+
+ - Dynamic Property
+
+ - key : theCount
+ - value : ${getStateValue("theCount"):plus(1)}
+
+
+
+
+ This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute.
+ All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the
+ flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}".
+
+ The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to
+ determine the minimum value.
+
+
+ If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the
+ average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most
+ recent values of count and sum:
+
+
+ - Count
+
+ - key : theCount
+ - value : ${getStateValue("theCount"):plus(1)}
+
+
+
+ - Sum
+
+ - key : theSum
+ - value : ${getStateValue("theSum"):plus(${flowfileValue})}
+
+
+
+ - Average
+
+ - key : theAverage
+ - value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}
+
+
+
+
+ Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly
+ calculates the average.
+
+ In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield.
+ If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to
+ "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship
+ should loop back to the processor since the only affected attributes will be overwritten.
+
+ Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed
+ for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.
+
+
Properties:
@@ -267,7 +332,13 @@
success
- If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.
-
+
+
+ set state fail
+
+ - If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.
+
+