This commit is contained in:
joewitt 2015-04-25 09:37:02 -04:00
parent 8f2502c4e4
commit 87e829682f
2 changed files with 22 additions and 47 deletions

View File

@ -114,8 +114,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
@SideEffectFree @SideEffectFree
@Tags({"attributes", "modification", "update", "Attribute Expression Language"}) @Tags({"attributes", "modification", "update", "Attribute Expression Language"})
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language") @CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language")
@DynamicProperty(name="A FlowFile attribute to update", value="The value to set it to", supportsExpressionLanguage=true, description="Updates a " + @DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true,
"FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value") description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This processor may write zero or more attributes as described in additional details") @WritesAttribute(attribute = "See additional details", description = "This processor may write zero or more attributes as described in additional details")
public class UpdateAttribute extends AbstractProcessor implements Searchable { public class UpdateAttribute extends AbstractProcessor implements Searchable {
@ -190,7 +190,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No expression for a condition in rule '%s' was found.", rule.getName())).build()); reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No expression for a condition in rule '%s' was found.", rule.getName())).build());
} else { } else {
final String expression = condition.getExpression().trim(); final String expression = condition.getExpression().trim();
reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false).validate(String.format("Condition for rule '%s'.", rule.getName()), expression, context)); reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
.validate(String.format("Condition for rule '%s'.", rule.getName()), expression, context));
} }
} }
} }
@ -204,9 +205,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
if (action.getAttribute() == null) { if (action.getAttribute() == null) {
reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("An action in rule '%s' is missing the attribute name.", rule.getName())).build()); reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("An action in rule '%s' is missing the attribute name.", rule.getName())).build());
} else if (action.getValue() == null) { } else if (action.getValue() == null) {
reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No value for attribute '%s' in rule '%s' was found.", action.getAttribute(), rule.getName())).build()); reasons.add(new ValidationResult.Builder().valid(false)
.explanation(String.format("No value for attribute '%s' in rule '%s' was found.", action.getAttribute(), rule.getName())).build());
} else { } else {
reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true).validate(String.format("Action for rule '%s'.", rule.getName()), action.getValue(), context)); reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)
.validate(String.format("Action for rule '%s'.", rule.getName()), action.getValue(), context));
} }
} }
} }
@ -328,15 +331,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
} }
} }
/** //Evaluates the specified Criteria on the specified flowfile. Clones the
* Evaluates the specified Criteria on the specified flowfile. Clones the // specified flow file for each rule that is applied.
* specified flow file for each rule that is applied. Returns a mapping of
* rules to flow files.
*
* @param criteria
* @param original
* @return
*/
private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map<FlowFile, List<Rule>> matchedRules) { private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map<FlowFile, List<Rule>> matchedRules) {
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
final List<Rule> rules = criteria.getRules(); final List<Rule> rules = criteria.getRules();
@ -373,13 +369,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return !matchedRules.isEmpty(); return !matchedRules.isEmpty();
} }
/** //Evaluates the specified rule on the specified flowfile.
* Evaluates the specified rule on the specified flowfile.
*
* @param rule
* @param flowfile
* @return
*/
private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) { private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) {
// go through each condition // go through each condition
for (final Condition condition : rule.getConditions()) { for (final Condition condition : rule.getConditions()) {
@ -406,13 +396,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return currentValue; return currentValue;
} }
/** //Evaluates the specified condition on the specified flowfile.
* Evaluates the specified condition on the specified flowfile.
*
* @param condition
* @param flowfile
* @return
*/
private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) { private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) {
try { try {
// evaluate the expression for the given flow file // evaluate the expression for the given flow file
@ -422,12 +406,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
} }
} }
/** // Executes the specified action on the specified flowfile.
* Executes the specified action on the specified flowfile.
*
* @param action
* @param flowfile
*/
private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile) { private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile) {
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
final Map<String, Action> actions = new HashMap<>(defaultActions); final Map<String, Action> actions = new HashMap<>(defaultActions);
@ -445,9 +424,9 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
} }
// add an action for the matched rule - when matching multiple rules against // add an action for the matched rule - when matching multiple rules against
// the original flowfile (use original) this will leave the last matching // the original flowfile (use original) this will leave the last matching
// rule's name as the value of this attribute. this decision was made since // rule's name as the value of this attribute. this decision was made since
// this would be the behavior if they user chained multiple UpdateAttributes // this would be the behavior if they user chained multiple UpdateAttributes
// together with 'use clone' specified // together with 'use clone' specified
final Action matchedRuleAction = new Action(); final Action matchedRuleAction = new Action();
matchedRuleAction.setAttribute(getClass().getSimpleName() + ".matchedRule"); matchedRuleAction.setAttribute(getClass().getSimpleName() + ".matchedRule");
@ -492,11 +471,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return session.putAllAttributes(flowfile, attributes); return session.putAllAttributes(flowfile, attributes);
} }
/** // Gets the default actions.
* Gets the default actions.
*
* @return
*/
private Map<String, Action> getDefaultActions(final Map<PropertyDescriptor, String> properties) { private Map<String, Action> getDefaultActions(final Map<PropertyDescriptor, String> properties) {
final Map<String, Action> defaultActions = new HashMap<>(); final Map<String, Action> defaultActions = new HashMap<>();

View File

@ -101,7 +101,7 @@ public class RuleResource {
// build the web context config // build the web context config
final NiFiWebRequestContext contextConfig = getRequestContext(processorId); final NiFiWebRequestContext contextConfig = getRequestContext(processorId);
// load the criteria // load the criteria
final Criteria criteria = getCriteria(nifiWebContext, contextConfig); final Criteria criteria = getCriteria(nifiWebContext, contextConfig);
// create the response entity // create the response entity
@ -372,7 +372,7 @@ public class RuleResource {
// build the web context config // build the web context config
final NiFiWebRequestContext requestContext = getRequestContext(processorId); final NiFiWebRequestContext requestContext = getRequestContext(processorId);
// load the criteria // load the criteria
final Criteria criteria = getCriteria(configurationContext, requestContext); final Criteria criteria = getCriteria(configurationContext, requestContext);
final List<Rule> rules = criteria.getRules(); final List<Rule> rules = criteria.getRules();
@ -415,7 +415,7 @@ public class RuleResource {
// build the web context config // build the web context config
final NiFiWebRequestContext requestContext = getRequestContext(processorId); final NiFiWebRequestContext requestContext = getRequestContext(processorId);
// load the criteria // load the criteria
final Criteria criteria = getCriteria(configurationContext, requestContext); final Criteria criteria = getCriteria(configurationContext, requestContext);
final List<Rule> rules = criteria.getRules(); final List<Rule> rules = criteria.getRules();
@ -641,7 +641,7 @@ public class RuleResource {
throw new WebApplicationException(error(message)); throw new WebApplicationException(error(message));
} }
} }
private NiFiWebRequestContext getRequestContext(final String processorId) { private NiFiWebRequestContext getRequestContext(final String processorId) {
return new HttpServletRequestContext(UiExtensionType.ProcessorConfiguration, request) { return new HttpServletRequestContext(UiExtensionType.ProcessorConfiguration, request) {
@Override @Override
@ -650,7 +650,7 @@ public class RuleResource {
} }
}; };
} }
private NiFiWebConfigurationRequestContext getConfigurationRequestContext(final String processorId, final Long revision, final String clientId) { private NiFiWebConfigurationRequestContext getConfigurationRequestContext(final String processorId, final Long revision, final String clientId) {
return new HttpServletConfigurationRequestContext(UiExtensionType.ProcessorConfiguration, request) { return new HttpServletConfigurationRequestContext(UiExtensionType.ProcessorConfiguration, request) {
@Override @Override