NIFI-3249 - UpdateAttribute performance improvements

This closes #1356

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Bryan Rosander 2016-12-22 12:02:31 -05:00 committed by jpercivall
parent 16898668c2
commit 35e8bedcc8
2 changed files with 75 additions and 46 deletions

View File

@ -44,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@ -144,16 +145,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
};
// static properties
public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Delete Attributes Expression")
.description("Regular expression for attributes to be deleted from FlowFiles.")
.name(DELETE_ATTRIBUTES_EXPRESSION_NAME)
.displayName(DELETE_ATTRIBUTES_EXPRESSION_NAME)
.description("Regular expression for attributes to be deleted from FlowFiles. Existing attributes that match will be deleted regardless of whether they are updated by this processor.")
.required(false)
.addValidator(DELETE_PROPERTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final String STORE_STATE_NAME = "Store State";
public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder()
.name("Store State")
.name(STORE_STATE_NAME)
.displayName(STORE_STATE_NAME)
.description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " +
"FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " +
"state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information")
@ -161,14 +166,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
.allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY)
.defaultValue(DO_NOT_STORE_STATE)
.build();
public static final String STATEFUL_VARIABLES_INIT_VALUE_NAME = "Stateful Variables Initial Value";
public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder()
.name("Stateful Variables Initial Value")
.name(STATEFUL_VARIABLES_INIT_VALUE_NAME)
.displayName(STATEFUL_VARIABLES_INIT_VALUE_NAME)
.description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " +
"when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.")
.required(false)
.addValidator(Validator.VALID)
.build();
private volatile Map<String, Action> defaultActions;
private volatile boolean debugEnabled;
public UpdateAttribute() {
relationships = statelessRelationshipSet;
@ -259,6 +270,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
context.getStateManager().setState(tempMap, Scope.LOCAL);
}
defaultActions = getDefaultActions(context.getProperties());
debugEnabled = getLogger().isDebugEnabled();
}
@OnStopped
public void onStopped() {
defaultActions = null;
}
@Override
@ -397,11 +416,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return;
}
final Map<PropertyDescriptor, String> properties = context.getProperties();
// get the default actions
final Map<String, Action> defaultActions = getDefaultActions(properties);
// record which rule should be applied to which flow file - when operating
// in 'use clone' mode, this collection will contain a number of entries
// that map to single element lists. this is because the original flowfile
@ -427,6 +441,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return;
}
Map<String, Action> defaultActions = this.defaultActions;
// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
// apply the actions for each rule and transfer the flowfile
@ -493,7 +509,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
rulesForFlowFile.add(rule);
// log if appropriate
if (logger.isDebugEnabled()) {
if (debugEnabled) {
logger.debug(this + " all conditions met for rule '" + rule.getName() + "'. Using flow file - " + flowfileToUse);
}
}
@ -517,16 +533,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
private PropertyValue getPropertyValue(final String text, final ProcessContext context) {
PropertyValue currentValue = propertyValues.get(text);
if (currentValue == null) {
currentValue = context.newPropertyValue(text);
PropertyValue previousValue = propertyValues.putIfAbsent(text, currentValue);
if (previousValue != null) {
currentValue = previousValue;
}
}
return currentValue;
return propertyValues.computeIfAbsent(text, k -> context.newPropertyValue(text));
}
// Evaluates the specified condition on the specified flowfile.
@ -580,29 +587,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
statefulAttributesToSet = null;
}
// go through each action
boolean debugEnabled = this.debugEnabled;
for (final Action action : actions.values()) {
if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
}
if (statefulAttributesToSet != null) {
if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) {
statefulAttributesToSet.put(action.getAttribute(), newAttributeValue);
}
}
attributesToUpdate.put(action.getAttribute(), newAttributeValue);
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
}
} else {
String attribute = action.getAttribute();
if (DELETE_ATTRIBUTES_EXPRESSION_NAME.equals(attribute)) {
try {
final String actionValue = action.getValue();
final String regex = (actionValue == null) ? null :
@ -614,17 +603,43 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
if (pattern.matcher(key).matches()) {
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this,
key, flowfile, regex));
if (debugEnabled) {
logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, key, flowfile, regex));
}
attributesToDelete.add(key);
}
}
// No point in updating if they will be removed
attributesToUpdate.keySet().removeAll(attributesToDelete);
}
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe);
throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, pe), pe);
}
} else {
boolean notDeleted = !attributesToDelete.contains(attribute);
boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule");
if (notDeleted || setStatefulAttribute) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
// log if appropriate
if (debugEnabled) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, attribute, newAttributeValue, flowfile, ruleName));
}
if (setStatefulAttribute) {
statefulAttributesToSet.put(attribute, newAttributeValue);
}
// No point in updating if it will be removed
if (notDeleted) {
attributesToUpdate.put(attribute, newAttributeValue);
}
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, pe), pe);
}
}
}
}
@ -644,7 +659,15 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
// update and delete the FlowFile attributes
FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
FlowFile returnFlowfile = flowfile;
if (attributesToUpdate.size() > 0) {
returnFlowfile = session.putAllAttributes(returnFlowfile, attributesToUpdate);
}
if (attributesToDelete.size() > 0) {
returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete);
}
if(statefulAttributesToSet != null) {
context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);

View File

@ -32,6 +32,12 @@
provide a regular expression and any attributes with a matching name will be deleted.
</p>
<p>
Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it
was updated or not. The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression"
will not detect it.
</p>
<p>
<strong>Basic Usage</strong>
</p>