diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index c52eba53ab..dd812891c2 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; @@ -64,7 +65,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; /** * This processor supports updating flowfile attributes and can do so - * conditionally or unconditionally. Like the FlowFileMetadataEnhancer, it can + * conditionally or unconditionally. It can also delete flowfile attributes + * that match a regular expression. + * + * Like the FlowFileMetadataEnhancer, it can * be configured with an arbitrary number of optional properties to define how * attributes should be updated. Each optional property represents an action * that is applied to all incoming flow files. An action is comprised of an @@ -112,11 +116,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; */ @EventDriven @SideEffectFree -@Tags({"attributes", "modification", "update", "Attribute Expression Language"}) -@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language") +@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language"}) +@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression") @DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true, description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value") -@WritesAttribute(attribute = "See additional details", description = "This processor may write zero or more attributes as described in additional details") +@WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details") public class UpdateAttribute extends AbstractProcessor implements Searchable { private final AtomicReference criteriaCache = new AtomicReference<>(null); @@ -124,6 +128,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { private final Set relationships; + // static properties + public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("Delete Attributes Expression") + .description("Regular expression for attributes to be deleted from flowfiles.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .description("All FlowFiles are routed to this relationship").name("success").build(); @@ -139,6 +151,13 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { return relationships; } + @Override + protected List getSupportedPropertyDescriptors() { + List descriptors = new ArrayList<>(); + descriptors.add(DELETE_ATTRIBUTES); + return Collections.unmodifiableList(descriptors); + } + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -435,40 +454,65 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } // attribute values that will be applied to the flow file - final Map attributes = new HashMap<>(actions.size()); + final Map attributesToUpdate = new HashMap<>(actions.size()); + final Set attributesToDelete = new HashSet<>(actions.size()); // go through each action for (final Action action : actions.values()) { - try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue(); + if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) { + try { + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue(); - // log if appropriate - if (logger.isDebugEnabled()) { - logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName)); + // log if appropriate + if (logger.isDebugEnabled()) { + logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName)); + } + + attributesToUpdate.put(action.getAttribute(), newAttributeValue); + } catch (final ProcessException pe) { + throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe); } + } else { + try { + final String regex = action.getValue(); + if (regex != null) { + Pattern pattern = Pattern.compile(regex); + final Set attributeKeys = flowfile.getAttributes().keySet(); + for (final String key : attributeKeys) { + if (pattern.matcher(key).matches()) { - attributes.put(action.getAttribute(), newAttributeValue); - } catch (final ProcessException pe) { - throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe); + // log if appropriate + if (logger.isDebugEnabled()) { + logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, + key, flowfile, regex)); + } + + attributesToDelete.add(key); + } + } + } + } catch (final ProcessException pe) { + throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe); + } } } // If the 'alternate.identifier' attribute is added, then we want to create an ADD_INFO provenance event. - final String alternateIdentifier = attributes.get(CoreAttributes.ALTERNATE_IDENTIFIER.key()); - if (alternateIdentifier != null) { + final String alternateIdentifierAdd = attributesToUpdate.get(CoreAttributes.ALTERNATE_IDENTIFIER.key()); + if (alternateIdentifierAdd != null) { try { - final URI uri = new URI(alternateIdentifier); + final URI uri = new URI(alternateIdentifierAdd); final String namespace = uri.getScheme(); if (namespace != null) { - final String identifier = alternateIdentifier.substring(Math.min(namespace.length() + 1, alternateIdentifier.length() - 1)); + final String identifier = alternateIdentifierAdd.substring(Math.min(namespace.length() + 1, alternateIdentifierAdd.length() - 1)); session.getProvenanceReporter().associate(flowfile, namespace, identifier); } } catch (final URISyntaxException e) { } } - // update the flowfile attributes - return session.putAllAttributes(flowfile, attributes); + // update and delete the flowfile attributes + return session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete); } // Gets the default actions. diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index 30cfce9d67..cb23635dcc 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -26,9 +26,10 @@

Description:

This processor updates the attributes of a FlowFile using properties or rules that are added by the user. - There are two ways to use this processor to add or modify attributes. One way is the "Basic Usage"; this allows you to set default attribute changes that affect + There are three ways to use this processor to add or modify attributes. One way is the "Basic Usage"; this allows you to set default attribute changes that affect every FlowFile going through the processor. The second way is the "Advanced Usage"; this allows you to make conditional attribute changes that only affect a FlowFile if it - meets certain conditions. It is possible to use both methods in the same processor at the same time. + meets certain conditions. It is possible to use both methods in the same processor at the same time. The third way is the "Delete Attributes Expression"; this allows you to + provide a regular expression and any attributes with a matching name will be deleted.

@@ -189,6 +190,26 @@ a type of "else" construct. In other words, if none of the rules match for the attribute, then the basic usage changes will be made.

+

Deleting Attributes

+ +

+ Deleting attributes is a simple as providing a regular expression for attribute names to be deleted. This can be a simple regular expression that will + match a single attribute or more complex regular expression to match a group of similarly named attributes or even seveal individual attribute names. +

+
    +
  • lastUser - will delete an attribute with the name "lastUser". +
  • +
  • user.* - will delete attributes beginning with "user", including for example "username, "userName", "userID", and "users". But + it will not delete "User" or "localuser". +
  • +
  • (user.*|host.*|.*Date) - will delete "user", "username", "userName", "hostInfo", "hosts", and "updateDate", but not "User", "HOST", "update", or "updatedate". +
  • +
+ +

+ The delete attributes function does not produce a Provenance Event if the alternate.identified Attribute is deleted. +

+

FlowFile Policy

diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index a5d74f778f..f1b75edf21 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -16,11 +16,6 @@ */ package org.apache.nifi.update.attributes; -import org.apache.nifi.update.attributes.FlowFilePolicy; -import org.apache.nifi.update.attributes.Criteria; -import org.apache.nifi.update.attributes.Condition; -import org.apache.nifi.update.attributes.Action; -import org.apache.nifi.update.attributes.Rule; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; @@ -56,8 +51,7 @@ public class TestUpdateAttribute { } private Criteria getCriteria() { - final Criteria criteria = new Criteria(); - return criteria; + return new Criteria(); } private void addRule(final Criteria criteria, final String name, final Collection conditions, final Map actions) { @@ -205,8 +199,8 @@ public class TestUpdateAttribute { addRule(criteria, "rule", Arrays.asList( // conditions "${attribute.1:equals('value.1')}"), getMap( - // actions - "attribute.2", "value.2")); + // actions + "attribute.2", "value.2")); final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setAnnotationData(serialize(criteria)); @@ -230,8 +224,8 @@ public class TestUpdateAttribute { addRule(criteria, "rule", Arrays.asList( // conditions "${attribute.1:equals('value.1')}"), getMap( - // actions - "attribute.2", "value.2")); + // actions + "attribute.2", "value.2")); final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setAnnotationData(serialize(criteria)); @@ -358,8 +352,8 @@ public class TestUpdateAttribute { addRule(criteria, "rule 3", Arrays.asList( // conditions "${attribute.1:equals('value.1')}"), getMap( - // actions - "attribute.2", "value.3")); + // actions + "attribute.2", "value.3")); final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setAnnotationData(serialize(criteria)); @@ -422,4 +416,88 @@ public class TestUpdateAttribute { // ensure the attributes are as expected flowfile.assertAttributeEquals("default.attr", "-more-stuff"); } + + @Test + public void testSimpleDelete() { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty("Delete Attributes Expression", "attribute.2"); + + final Map attributes = new HashMap<>(); + attributes.put("attribute.1", "value.1"); + attributes.put("attribute.2", "value.2"); + runner.enqueue(new byte[0], attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(0).assertAttributeEquals("attribute.1", "value.1"); + result.get(0).assertAttributeNotExists("attribute.2"); + } + + @Test + public void testRegexDotDelete() { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty("Delete Attributes Expression", "attribute.2"); + + final Map attributes = new HashMap<>(); + attributes.put("attribute.1", "value.1"); + attributes.put("attribute.2", "value.2"); + attributes.put("attributex2", "valuex2"); + runner.enqueue(new byte[0], attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(0).assertAttributeEquals("attribute.1", "value.1"); + result.get(0).assertAttributeNotExists("attribute.2"); + result.get(0).assertAttributeNotExists("attributex2"); + } + + @Test + public void testRegexLiteralDotDelete() { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty("Delete Attributes Expression", "attribute\\.2"); + + final Map attributes = new HashMap<>(); + attributes.put("attribute.1", "value.1"); + attributes.put("attribute.2", "value.2"); + attributes.put("attributex2", "valuex2"); + runner.enqueue(new byte[0], attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(0).assertAttributeEquals("attribute.1", "value.1"); + result.get(0).assertAttributeNotExists("attribute.2"); + result.get(0).assertAttributeExists("attributex2"); + } + + @Test + public void testRegexGroupDelete() { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)"); + + final Map attributes = new HashMap<>(); + attributes.put("attribute.1", "value.1"); + attributes.put("attribute.2", "value.2"); + attributes.put("attribute.6", "value.6"); + attributes.put("sampleSize", "value.size"); + attributes.put("sample.1", "value.sample.1"); + attributes.put("simple.1", "value.simple.1"); + runner.enqueue(new byte[0], attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(0).assertAttributeEquals("attribute.1", "value.1"); + result.get(0).assertAttributeNotExists("attribute.2"); + result.get(0).assertAttributeExists("attribute.6"); + result.get(0).assertAttributeNotExists("sampleSize"); + result.get(0).assertAttributeNotExists("sample.1"); + result.get(0).assertAttributeExists("simple.1"); + } }