NIFI-1582 added state to UpdateAttribute as well as updated a few parts that hadn't be touched in years (referenced the 'FlowFileMetadataEnhancer' processor'. Also added a 'NUMBER_VALIDATOR' to StandardValidators

NIFI-1582 removing the option to use cluster state

NIFI-1582 addressing Oleg's comments

NIFI-1582 No longer forcing numbers as the init value and adding getStateValue() to EL instead of using 'ATTRIBUTE_state'

NIFI-1582 Removing init state value

NIFI-1582 Adding documentation for the changes to Init State value

This closes #319
This commit is contained in:
jpercivall 2016-04-01 17:27:42 -04:00 committed by Matt Burgess
parent 97d2d30423
commit e36b37692c
19 changed files with 819 additions and 131 deletions

View File

@ -257,6 +257,29 @@ public interface PropertyValue {
*/
PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression
* Language; a PropertyValue with the new value is then returned, supporting
* call chaining.
* </p>
*
* @param flowFile to evaluate attributes of
* @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in
* this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority.
* @param decorator the decorator to use in order to update the values returned by the Expression Language
* @param stateValues a Map of the state values to be referenced explicitly by specific state accessing functions
*
* @return a PropertyValue with the new value is returned, supporting call
* chaining
*
* @throws ProcessException if the Expression cannot be compiled or evaluating
* the Expression against the given attributes causes an Exception to be thrown
*/
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException;
/**
* <p>
* Replaces values in the Property Value using the NiFi Expression Language.

View File

@ -137,6 +137,7 @@ UNESCAPE_HTML3 : 'unescapeHtml3';
UNESCAPE_HTML4 : 'unescapeHtml4';
BASE64_ENCODE : 'base64Encode';
BASE64_DECODE : 'base64Decode';
GET_STATE_VALUE: 'getStateValue';
// 1 arg functions
SUBSTRING_AFTER : 'substringAfter';

View File

@ -129,7 +129,7 @@ functionCall : functionRef ->
booleanLiteral : TRUE | FALSE;
zeroArgStandaloneFunction : (IP | UUID | NOW | NEXT_INT | HOSTNAME | RANDOM) LPAREN! RPAREN!;
oneArgStandaloneFunction : ((TO_LITERAL | MATH)^ LPAREN! anyArg RPAREN!) |
oneArgStandaloneFunction : ((TO_LITERAL | MATH | GET_STATE_VALUE)^ LPAREN! anyArg RPAREN!) |
(HOSTNAME^ LPAREN! booleanLiteral RPAREN!);
standaloneFunction : zeroArgStandaloneFunction | oneArgStandaloneFunction;

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.attribute.expression.language;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/*
*This class is passed to Evaluators so that certain evaluators that specifically work with state will have access to the state values explicitly.
*It implements Map so that other evaluators don't have to be changed.
*/
public class AttributesAndState implements Map<String, String> {
private final Map<String, String> stateMap;
private final Map<String, String> attributes;
public AttributesAndState(Map<String, String> attributes, Map<String, String> state) {
super();
stateMap = state;
this.attributes = attributes;
}
public Map<String, String> getStateMap() {
return stateMap;
}
@Override
public int size() {
return attributes.size();
}
@Override
public boolean isEmpty() {
return attributes.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return attributes.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return attributes.containsValue(value);
}
@Override
public String get(Object key) {
return attributes.get(key);
}
@Override
public String put(String key, String value) {
throw new UnsupportedOperationException();
}
@Override
public String remove(Object key) {
throw new UnsupportedOperationException();
}
@Override
public void putAll(Map<? extends String, ? extends String> m) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> keySet() {
return attributes.keySet();
}
@Override
public Collection<String> values() {
return attributes.values();
}
@Override
public Set<Entry<String, String>> entrySet() {
return attributes.entrySet();
}
}

View File

@ -33,4 +33,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
public String evaluateExpressions(Map<String, String> valueLookup, AttributeValueDecorator decorator) throws ProcessException {
return value;
}
@Override
public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator, Map<String, String> stateVariables) throws ProcessException {
return value;
}
}

View File

@ -43,5 +43,8 @@ public class InvalidPreparedQuery implements PreparedQuery {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
@Override
public String evaluateExpressions( Map<String, String> valueLookup, AttributeValueDecorator decorator, Map<String, String> stateVariables) throws ProcessException {
throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
}
}

View File

@ -25,4 +25,5 @@ public interface PreparedQuery {
String evaluateExpressions(Map<String, String> valueLookup, AttributeValueDecorator decorator) throws ProcessException;
String evaluateExpressions(final Map<String, String> valueLookup, final AttributeValueDecorator decorator, final Map<String, String> stateVariables) throws ProcessException;
}

View File

@ -49,6 +49,7 @@ import org.apache.nifi.attribute.expression.language.evaluation.functions.FindEv
import org.apache.nifi.attribute.expression.language.evaluation.functions.FormatEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.FromRadixEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GetDelimitedFieldEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GetStateVariableEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanOrEqualEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.functions.HostnameEvaluator;
@ -149,6 +150,7 @@ import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpre
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FIND;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FORMAT;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_DELIMITED_FIELD;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_STATE_VALUE;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN_OR_EQUAL;
import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.HOSTNAME;
@ -384,8 +386,9 @@ public class Query {
return -1;
}
static String evaluateExpression(final Tree tree, final String queryText, final Map<String, String> valueMap, final AttributeValueDecorator decorator) throws ProcessException {
final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap).getValue();
static String evaluateExpression(final Tree tree, final String queryText, final Map<String, String> valueMap, final AttributeValueDecorator decorator,
final Map<String, String> stateVariables) throws ProcessException {
final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap, stateVariables).getValue();
if (evaluated == null) {
return null;
}
@ -395,6 +398,11 @@ public class Query {
return decorator == null ? escaped : decorator.decorate(escaped);
}
static String evaluateExpressions(final String rawValue, Map<String, String> expressionMap, final AttributeValueDecorator decorator, final Map<String, String> stateVariables)
throws ProcessException {
return Query.prepare(rawValue).evaluateExpressions(expressionMap, decorator, stateVariables);
}
static String evaluateExpressions(final String rawValue, final Map<String, String> valueLookup) throws ProcessException {
return evaluateExpressions(rawValue, valueLookup, null);
}
@ -563,12 +571,21 @@ public class Query {
}
QueryResult<?> evaluate(final Map<String, String> map) {
return evaluate(map, null);
}
QueryResult<?> evaluate(final Map<String, String> attributes, final Map<String, String> stateMap) {
if (evaluated.getAndSet(true)) {
throw new IllegalStateException("A Query cannot be evaluated more than once");
}
return evaluator.evaluate(map);
if (stateMap != null) {
AttributesAndState attributesAndState = new AttributesAndState(attributes, stateMap);
return evaluator.evaluate(attributesAndState);
} else {
return evaluator.evaluate(attributes);
}
}
Tree getTree() {
return this.tree;
@ -747,6 +764,12 @@ public class Query {
throw new AttributeExpressionLanguageParsingException("Call to math() as the subject must take exactly 1 parameter");
}
}
case GET_STATE_VALUE: {
final Tree childTree = tree.getChild(0);
final Evaluator<?> argEvaluator = buildEvaluator(childTree);
final Evaluator<String> stringEvaluator = toStringEvaluator(argEvaluator);
return new GetStateVariableEvaluator(stringEvaluator);
}
default:
throw new AttributeExpressionLanguageParsingException("Unexpected token: " + tree.toString());
}

View File

@ -38,14 +38,14 @@ public class StandardPreparedQuery implements PreparedQuery {
@Override
public String evaluateExpressions(final Map<String, String> valueMap, final AttributeValueDecorator decorator) throws ProcessException {
public String evaluateExpressions(final Map<String, String> valMap, final AttributeValueDecorator decorator, final Map<String, String> stateVariables) throws ProcessException {
final StringBuilder sb = new StringBuilder();
for (final String val : queryStrings) {
final Tree tree = trees.get(val);
if (tree == null) {
sb.append(val);
} else {
final String evaluated = Query.evaluateExpression(tree, val, valueMap, decorator);
final String evaluated = Query.evaluateExpression(tree, val, valMap, decorator, stateVariables);
if (evaluated != null) {
sb.append(evaluated);
}
@ -54,4 +54,9 @@ public class StandardPreparedQuery implements PreparedQuery {
return sb.toString();
}
@Override
public String evaluateExpressions(final Map<String, String> valMap, final AttributeValueDecorator decorator)
throws ProcessException {
return evaluateExpressions(valMap, decorator, null);
}
}

View File

@ -145,11 +145,17 @@ public class StandardPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException {
if (rawValue == null || preparedQuery == null) {
return this;
}
final ValueLookup lookup = new ValueLookup(variableRegistry, flowFile, additionalAttributes);
return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator), serviceLookup, null, variableRegistry);
return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator, stateValues), serviceLookup, null);
}
@Override

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.attribute.expression.language.evaluation.functions;
import org.apache.nifi.attribute.expression.language.AttributesAndState;
import org.apache.nifi.attribute.expression.language.evaluation.Evaluator;
import org.apache.nifi.attribute.expression.language.evaluation.QueryResult;
import org.apache.nifi.attribute.expression.language.evaluation.StringEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.StringQueryResult;
import java.util.Map;
public class GetStateVariableEvaluator extends StringEvaluator {
private final Evaluator<String> subject;
public GetStateVariableEvaluator(final Evaluator<String> subject) {
this.subject = subject;
}
@Override
public QueryResult<String> evaluate(Map<String, String> attributes) {
if (!(attributes instanceof AttributesAndState)){
return new StringQueryResult(null);
}
final String subjectValue = subject.evaluate(attributes).getValue();
if (subjectValue == null) {
return new StringQueryResult(null);
}
AttributesAndState attributesAndState = (AttributesAndState) attributes;
Map<String, String> stateMap = attributesAndState.getStateMap();
String stateValue = stateMap.get(subjectValue);
return new StringQueryResult(stateValue);
}
@Override
public Evaluator<String> getSubjectEvaluator() {
return subject;
}
}

View File

@ -72,6 +72,7 @@ public class TestQuery {
assertValid("${hostname()}");
assertValid("${literal(3)}");
assertValid("${random()}");
assertValid("${getStateValue('the_count')}");
// left here because it's convenient for looking at the output
//System.out.println(Query.compile("").evaluate(null));
}
@ -1488,6 +1489,32 @@ public class TestQuery {
assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes));
}
@Test
public void testGetStateValue() {
final Map<String, String> stateValues = new HashMap<>();
stateValues.put("abc", "xyz");
stateValues.put("123", "qwe");
stateValues.put("true", "asd");
stateValues.put("iop", "098");
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "iop");
attributes.put("4321", "123");
attributes.put("false", "bnm");
String query = "${getStateValue('abc')}";
verifyEquals(query, attributes, stateValues, "xyz");
query = "${getStateValue(${'4321':toString()})}";
verifyEquals(query, attributes, stateValues, "qwe");
query = "${getStateValue(${literal(true):toString()})}";
verifyEquals(query, attributes, stateValues, "asd");
query = "${getStateValue(${abc}):equals('098')}";
verifyEquals(query, attributes, stateValues, true);
}
@Test
public void testLiteralFunction() {
final Map<String, String> attrs = Collections.<String, String>emptyMap();
@ -1658,11 +1685,15 @@ public class TestQuery {
}
private void verifyEquals(final String expression, final Map<String, String> attributes, final Object expectedResult) {
verifyEquals(expression,attributes, null, expectedResult);
}
private void verifyEquals(final String expression, final Map<String, String> attributes, final Map<String, String> stateValues, final Object expectedResult) {
Query.validateExpression(expression, false);
assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null));
assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null, stateValues));
final Query query = Query.compile(expression);
final QueryResult<?> result = query.evaluate(attributes);
final QueryResult<?> result = query.evaluate(attributes, stateValues);
if (expectedResult instanceof Long) {
if (ResultType.NUMBER.equals(result.getResultType())) {

View File

@ -23,6 +23,8 @@ import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Instant;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -126,6 +128,24 @@ public class StandardValidators {
}
};
public static final Validator NUMBER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
String reason = null;
try {
NumberFormat.getInstance().parse(value);
} catch (ParseException e) {
reason = "not a valid Number";
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true);
/**

View File

@ -1974,6 +1974,22 @@ an error when validating the function.
`${literal( ${allMatchingAttributes('a.*'):count()} ):gt(3)}` returns true if there are more than 3 attributes whose
names begin with the letter `a`.
[.function]
=== getStateValue
*Description*: [.description]#Access a processor's state values by passing in the String key and getting the value back as a String. This
is a special Expression Language function that only works with processors that explicitly allow EL to query state. Currently only UpdateAttribute
does.#
*Subject Type*: [.subjectless]#No Subject#
*Arguments*:
- [.String]#_Key_# : [.argDesc]#The key to use when accessing the state map.#
*Return Type*: [.returnType]#String#
*Examples*: UpdateAttribute processor has stored the key "count" with value "20" in state. '${getStateValue("count")}` returns `20`.
[[multi]]
== Evaluating Multiple Attributes

View File

@ -171,12 +171,18 @@ public class MockPropertyValue implements PropertyValue {
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator);
final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, stateValues);
return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true, variableRegistry);
}

View File

@ -160,6 +160,12 @@ public class ConnectableProcessContext implements ProcessContext {
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator) throws ProcessException {
return null;
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException {
return null;
}
};
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -37,6 +38,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -46,6 +49,9 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -66,71 +72,49 @@ import org.apache.nifi.update.attributes.FlowFilePolicy;
import org.apache.nifi.update.attributes.Rule;
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
/**
* This processor supports updating flowfile attributes and can do so
* conditionally or unconditionally. It can also delete flowfile attributes
* that match a regular expression.
*
* Like the FlowFileMetadataEnhancer, it can
* be configured with an arbitrary number of optional properties to define how
* attributes should be updated. Each optional property represents an action
* that is applied to all incoming flow files. An action is comprised of an
* attribute key and a format string. The format string supports the following
* parameters.
* <ul>
* <li>%1 - is the random generated UUID. </li>
* <li>%2 - is the current calendar time. </li>
* <li>${"attribute.key") - is the flow file attribute value of the key
* contained within the brackets.</li>
* </ul>
*
* When creating the optional properties, enter the attribute key as the
* property name and the desired format string as the value. The optional
* properties are considered default actions and are applied unconditionally.
*
* In addition to the default actions, this processor has a user interface (UI)
* where conditional actions can be specified. In the UI, rules can be created.
* Rules are comprised of an arbitrary number of conditions and actions. In
* order for a rule to be activated, all conditions must evaluate to true.
*
* A rule condition is comprised of an attribute key and a regular expression. A
* condition evaluates to true when the flowfile contains the attribute
* specified and it's value matches the specified regular expression.
*
* A rule action follows the same definition as a rule above. It includes an
* attribute key and a format string. The format string supports the same
* parameters defined above.
*
* When a rule is activated (because conditions evaluate to true), all actions
* in that rule are executed. Once each action has been applied, any remaining
* default actions will be applied. This means that if rule action and a default
* action modify the same attribute, only the rule action will execute. Default
* actions will only execute when the attribute in question is not modified as
* part of an activated rule.
*
* The incoming flow file is cloned for each rule that is activated. If no rule
* is activated, any default actions are applied to the original flowfile and it
* is transferred.
*
* This processor only supports a SUCCESS relationship.
*
* Note: In order for configuration changes made in the custom UI to take
* effect, the processor must be stopped and started.
*/
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language"})
@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language", "state"})
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression")
@DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true,
description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details")
@Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.")
public class UpdateAttribute extends AbstractProcessor implements Searchable {
public static final String DO_NOT_STORE_STATE = "do not store state";
public static final String STORE_STATE_LOCALLY = "store state locally";
private boolean stateful = false;
private final AtomicReference<Criteria> criteriaCache = new AtomicReference<>(null);
private final ConcurrentMap<String, PropertyValue> propertyValues = new ConcurrentHashMap<>();
private final Set<Relationship> relationships;
private final static Set<Relationship> statelessRelationshipSet;
private final static Set<Relationship> statefulRelationshipSet;
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All successful FlowFiles are routed to this relationship").name("success").build();
public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder()
.description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build();
static {
Set<Relationship> tempStatelessSet = new HashSet<>();
tempStatelessSet.add(REL_SUCCESS);
statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet);
Set<Relationship> tempStatefulSet = new HashSet<>();
tempStatefulSet.add(REL_SUCCESS);
tempStatefulSet.add(REL_FAILED_SET_STATE);
statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet);
}
private volatile Set<Relationship> relationships;
private static final Validator DELETE_PROPERTY_VALIDATOR = new Validator() {
private final Validator DPV_RE_VALIDATOR = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
@ -162,20 +146,32 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// static properties
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Delete Attributes Expression")
.description("Regular expression for attributes to be deleted from flowfiles.")
.description("Regular expression for attributes to be deleted from FlowFiles.")
.required(false)
.addValidator(DELETE_PROPERTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All FlowFiles are routed to this relationship").name("success").build();
public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder()
.name("Store State")
.description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " +
"FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " +
"state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information")
.required(true)
.allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY)
.defaultValue(DO_NOT_STORE_STATE)
.build();
public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder()
.name("Stateful Variables Initial Value")
.description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " +
"when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.")
.required(false)
.addValidator(Validator.VALID)
.build();
public UpdateAttribute() {
final Set<Relationship> relationshipSet = new HashSet<>();
relationshipSet.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(relationshipSet);
relationships = statelessRelationshipSet;
}
@Override
@ -187,30 +183,96 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DELETE_ATTRIBUTES);
descriptors.add(STORE_STATE);
descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
return Collections.unmodifiableList(descriptors);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
PropertyDescriptor.Builder propertyBuilder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.dynamic(true);
if (stateful) {
return propertyBuilder
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.build();
} else {
return propertyBuilder
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (descriptor.equals(STORE_STATE)) {
if (DO_NOT_STORE_STATE.equals(newValue)){
stateful = false;
relationships = statelessRelationshipSet;
} else {
stateful = true;
relationships = statefulRelationshipSet;
}
}
}
@OnScheduled
public void clearPropertyValueMap() {
public void onScheduled(final ProcessContext context) throws IOException {
criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData()));
propertyValues.clear();
if(stateful) {
StateManager stateManager = context.getStateManager();
StateMap state = stateManager.getState(Scope.LOCAL);
HashMap<String, String> tempMap = new HashMap<>();
tempMap.putAll(state.toMap());
String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue();
// Initialize the stateful default actions
for (PropertyDescriptor entry : context.getProperties().keySet()) {
if (entry.isDynamic()) {
if(!tempMap.containsKey(entry.getName())) {
tempMap.put(entry.getName(), initValue);
}
}
}
// Initialize the stateful actions if the criteria exists
final Criteria criteria = criteriaCache.get();
if (criteria != null) {
for (Rule rule : criteria.getRules()) {
for (Action action : rule.getActions()) {
if (!tempMap.containsKey(action.getAttribute())) {
tempMap.put(action.getAttribute(), initValue);
}
}
}
}
context.getStateManager().setState(tempMap, Scope.LOCAL);
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> reasons = new ArrayList<>(super.customValidate(context));
if (!context.getProperty(STORE_STATE).getValue().equals(DO_NOT_STORE_STATE)){
String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue();
if (initValue == null){
reasons.add(new ValidationResult.Builder().subject(STATEFUL_VARIABLES_INIT_VALUE.getDisplayName()).valid(false)
.explanation("initial state value much be set if the processor is configured to store state.").build());
}
}
Criteria criteria = null;
try {
criteria = CriteriaSerDe.deserialize(context.getAnnotationData());
@ -325,20 +387,12 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
}
@OnScheduled
public void parseAnnotationData(final ProcessContext context) {
criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData()));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ComponentLog logger = getLogger();
final Criteria criteria = criteriaCache.get();
List<FlowFile> flowFiles = session.get(100);
if (flowFiles.isEmpty()) {
return;
}
FlowFile flowFile = session.get();
final Map<PropertyDescriptor, String> properties = context.getProperties();
@ -353,45 +407,70 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// because is the original flowfile is used for all matching rules. in this
// case the order of the matching rules is preserved in the list
final Map<FlowFile, List<Rule>> matchedRules = new HashMap<>();
Map<String, String> statefulAttributes = null;
for (FlowFile flowFile : flowFiles) {
matchedRules.clear();
try {
if (stateful) {
statefulAttributes = new HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap());
} else {
statefulAttributes = null;
}
} catch (IOException e) {
logger.error("Failed to update attributes for {} due to failing to get state; transferring FlowFile back to '{}'", new Object[]{flowFile, Relationship.SELF.getName()}, e);
session.transfer(flowFile);
context.yield();
return;
}
// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules)) {
if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
// apply the actions for each rule and transfer the flowfile
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
FlowFile match = entry.getKey();
final List<Rule> rules = entry.getValue();
// execute each matching rule(s)
match = executeActions(session, context, rules, defaultActions, match);
try {
match = executeActions(session, context, rules, defaultActions, match, statefulAttributes);
logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
// transfer the match
session.getProvenanceReporter().modifyAttributes(match);
session.transfer(match, REL_SUCCESS);
} catch (IOException e) {
logger.error("Failed to update attributes for {} due to a failure to set the state afterwards; transferring to '{}'", new Object[]{match, REL_FAILED_SET_STATE.getName()}, e);
session.transfer(match, REL_FAILED_SET_STATE);
return;
}
}
} else {
// transfer the flowfile to no match (that has the default actions applied)
flowFile = executeActions(session, context, null, defaultActions, flowFile);
try {
flowFile = executeActions(session, context, null, defaultActions, flowFile, statefulAttributes);
logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
logger.error("Failed to update attributes for {} due to failures setting state afterwards; transferring to '{}'", new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e);
session.transfer(flowFile, REL_FAILED_SET_STATE);
return;
}
}
}
//Evaluates the specified Criteria on the specified flowfile. Clones the
// specified flow file for each rule that is applied.
private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map<FlowFile, List<Rule>> matchedRules) {
private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map<FlowFile,
List<Rule>> matchedRules, final Map<String, String> statefulAttributes) {
final ComponentLog logger = getLogger();
final List<Rule> rules = criteria.getRules();
// consider each rule and hold a copy of the flowfile for each matched rule
for (final Rule rule : rules) {
// evaluate the rule
if (evaluateRule(context, rule, flowfile)) {
if (evaluateRule(context, rule, flowfile, statefulAttributes)) {
final FlowFile flowfileToUse;
// determine if we should use the original flow file or clone
@ -421,12 +500,12 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
//Evaluates the specified rule on the specified flowfile.
private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) {
private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile, final Map<String, String> statefulAttributes) {
// go through each condition
for (final Condition condition : rule.getConditions()) {
// fail if any condition is not met
if (!evaluateCondition(context, condition, flowfile)) {
if (!evaluateCondition(context, condition, flowfile, statefulAttributes)) {
return false;
}
}
@ -448,17 +527,18 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
// Evaluates the specified condition on the specified flowfile.
private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) {
private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map<String, String> statefulAttributes) {
try {
// evaluate the expression for the given flow file
return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile).asBoolean();
return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean();
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe);
}
}
// Executes the specified action on the specified flowfile.
private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile) {
private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile,
final Map<String, String> statefulAttributes) throws IOException {
final ComponentLog logger = getLogger();
final Map<String, Action> actions = new HashMap<>(defaultActions);
final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName();
@ -489,17 +569,32 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
final Map<String, String> attributesToUpdate = new HashMap<>(actions.size());
final Set<String> attributesToDelete = new HashSet<>(actions.size());
final Map<String, String> statefulAttributesToSet;
if (statefulAttributes != null){
statefulAttributesToSet = new HashMap<>();
} else {
statefulAttributesToSet = null;
}
// go through each action
for (final Action action : actions.values()) {
if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue();
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
}
if (statefulAttributesToSet != null) {
if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) {
statefulAttributesToSet.put(action.getAttribute(), newAttributeValue);
}
}
attributesToUpdate.put(action.getAttribute(), newAttributeValue);
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
@ -545,8 +640,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
}
// update and delete the flowfile attributes
return session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
// update and delete the FlowFile attributes
FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
if(statefulAttributesToSet != null) {
context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);
}
return returnFlowfile;
}
// Gets the default actions.
@ -554,11 +655,13 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
final Map<String, Action> defaultActions = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) {
final Action action = new Action();
action.setAttribute(entry.getKey().getName());
action.setValue(entry.getValue());
defaultActions.put(action.getAttribute(), action);
}
}
return defaultActions;
}

View File

@ -247,6 +247,71 @@
Once all changes have been saved in the Advanced UI, the UI can be closed using the X in the top right corner.
</p>
<p>
<strong>Stateful Usage</strong>
</p>
<p>
By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but
also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be
referenced as a stateful variable like so:
<ul>
<li>Dynamic Property
<ul>
<li>key : theCount</li>
<li>value : ${getStateValue("theCount"):plus(1)}</li>
</ul>
</li>
</ul>
This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute.
All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the
flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}".
The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to
determine the minimum value.
If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the
average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most
recent values of count and sum:
<ul>
<li>Count
<ul>
<li>key : theCount</li>
<li>value : ${getStateValue("theCount"):plus(1)}</li>
</ul>
</li>
<li>Sum
<ul>
<li>key : theSum</li>
<li>value : ${getStateValue("theSum"):plus(${flowfileValue})}</li>
</ul>
</li>
<li>Average
<ul>
<li>key : theAverage</li>
<li>value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}</li>
</ul>
</li>
</ul>
Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly
calculates the average.
In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield.
If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to
"set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship
should loop back to the processor since the only affected attributes will be overwritten.
Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed
for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.
</p>
<p>
<strong>Properties:</strong>
</p>
@ -267,7 +332,13 @@
<li>success
<ul>
<li>If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.</li>
</ul></li>
</ul>
</li>
<li>set state fail
<ul>
<li>If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.</li>
</ul>
</li>
</ul>
</body>

View File

@ -34,6 +34,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.apache.nifi.processors.attributes.UpdateAttribute.STORE_STATE_LOCALLY;
import static org.junit.Assert.assertEquals;
/**
@ -105,17 +106,223 @@ public class TestUpdateAttribute {
@Test
public void testDefaultAddAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("NewAttr", "abc${'Hello${Goose}'}!");
runner.setProperty("NewAttr", "${one:plus(${two})}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("Goose", "Geese");
attributes.put("HelloGeese", "123");
attributes.put("one", "1");
attributes.put("two", "2");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(0).assertAttributeEquals("NewAttr", "abc123!");
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(0).assertAttributeEquals("NewAttr", "3");
}
@Test
public void testBasicState() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty("count", "${getStateValue('count'):plus(1)}");
runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}");
runner.assertNotValid();
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
runner.assertValid();
final Map<String, String> attributes2 = new HashMap<>();
attributes2.put("pencils", "2");
runner.enqueue(new byte[0],attributes2);
runner.enqueue(new byte[0],attributes2);
final Map<String, String> attributes3 = new HashMap<>();
attributes3.put("pencils", "3");
runner.enqueue(new byte[0], attributes3);
runner.enqueue(new byte[0], attributes3);
final Map<String, String> attributes5 = new HashMap<>();
attributes5.put("pencils", "5");
runner.enqueue(new byte[0], attributes5);
runner.run(5);
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 5);
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("count", "5");
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("sum", "15");
}
@Test
public void testStateWithInitValue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "10");
runner.setProperty("count", "${getStateValue('count'):plus(1)}");
runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}");
runner.assertValid();
final Map<String, String> attributes2 = new HashMap<>();
attributes2.put("pencils", "2");
runner.enqueue(new byte[0],attributes2);
runner.enqueue(new byte[0],attributes2);
final Map<String, String> attributes3 = new HashMap<>();
attributes3.put("pencils", "3");
runner.enqueue(new byte[0], attributes3);
runner.enqueue(new byte[0], attributes3);
final Map<String, String> attributes5 = new HashMap<>();
attributes5.put("pencils", "5");
runner.enqueue(new byte[0], attributes5);
runner.run(5);
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 5);
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("count", "15");
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("sum", "25");
}
@Test
public void testRuleHitWithState() throws Exception {
final Criteria criteria = getCriteria();
addRule(criteria, "rule", Arrays.asList(
// conditions
"${getStateValue('maxValue'):lt(${value})}"), getMap(
// actions
"maxValue", "${value}"));
TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
runner.setAnnotationData(serialize(criteria));
final Map<String, String> attributes = new HashMap<>();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "2");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "4");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(2).assertAttributeEquals("maxValue", "4");
result.get(3).assertAttributeEquals("maxValue", null);
}
@Test
public void testRuleHitWithStateWithDefault() throws Exception {
final Criteria criteria = getCriteria();
addRule(criteria, "rule", Arrays.asList(
// conditions
"${getStateValue('maxValue'):lt(${value})}"), getMap(
// actions
"maxValue", "${value}"));
TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
runner.setAnnotationData(serialize(criteria));
runner.setProperty("maxValue", "${getStateValue('maxValue')}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "2");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "4");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(2).assertAttributeEquals("maxValue", "4");
result.get(3).assertAttributeEquals("maxValue", "4");
}
@Test
public void testRuleHitWithStateWithInitValue() throws Exception {
final Criteria criteria = getCriteria();
addRule(criteria, "rule", Arrays.asList(
// conditions
"${getStateValue('minValue'):ge(${value})}"), getMap(
// actions
"minValue", "${value}"));
TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "5");
runner.setAnnotationData(serialize(criteria));
final Map<String, String> attributes = new HashMap<>();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "2");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "4");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(3).assertAttributeEquals("minValue", "1");
}
@Test
public void testMultipleRulesWithStateAndDelete() throws Exception {
final Criteria criteria = getCriteria();
addRule(criteria, "rule", Arrays.asList(
// conditions
"${getStateValue('maxValue'):lt(${value})}"), getMap(
// actions
"maxValue", "${value}"));
TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "badValue");
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
runner.setAnnotationData(serialize(criteria));
runner.setProperty("maxValue", "${getStateValue('maxValue')}");
runner.setProperty("theCount", "${getStateValue('theCount'):plus(1)}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("value", "1");
attributes.put("badValue", "10");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "2");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "5");
runner.enqueue(new byte[0], attributes);
runner.run();
attributes.put("value", "1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(3).assertAttributeEquals("maxValue", "5");
result.get(3).assertAttributeEquals("theCount", "4");
result.get(0).assertAttributeEquals("badValue", null);
}
@Test
@ -423,7 +630,7 @@ public class TestUpdateAttribute {
@Test
public void testSimpleDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute.2");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -441,7 +648,7 @@ public class TestUpdateAttribute {
@Test
public void testRegexDotDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute.2");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -461,7 +668,7 @@ public class TestUpdateAttribute {
@Test
public void testRegexLiteralDotDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute\\.2");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute\\.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -481,7 +688,7 @@ public class TestUpdateAttribute {
@Test
public void testRegexGroupDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "(attribute\\.[2-5]|sample.*)");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -507,7 +714,7 @@ public class TestUpdateAttribute {
@Test
public void testAttributeKey() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "(attribute\\.[2-5]|sample.*)");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -533,7 +740,7 @@ public class TestUpdateAttribute {
@Test
public void testExpressionLiteralDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "${literal('attribute\\.'):append(${literal(6)})}");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${literal('attribute\\.'):append(${literal(6)})}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -559,7 +766,7 @@ public class TestUpdateAttribute {
@Test
public void testExpressionRegexDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "${literal('(attribute\\.'):append(${literal('[2-5]')}):append(${literal('|sample.*)')})}");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${literal('(attribute\\.'):append(${literal('[2-5]')}):append(${literal('|sample.*)')})}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -585,7 +792,7 @@ public class TestUpdateAttribute {
@Test
public void testAttributeListDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute.1|attribute.2|sample.1|simple.1");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.1|attribute.2|sample.1|simple.1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
@ -611,14 +818,14 @@ public class TestUpdateAttribute {
@Test
public void testInvalidRegex() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "(");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "(");
runner.assertNotValid();
}
@Test
public void testInvalidRegexInAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "${butter}");
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${butter}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();