This commit is contained in:
Mark Payne 2015-09-25 09:15:34 -04:00
commit 96764ed6a1
4 changed files with 203 additions and 36 deletions

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.util.StringUtil;
@EventDriven
@SideEffectFree
@ -79,6 +80,14 @@ public class LogAttribute extends AbstractProcessor {
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
.name("Log prefix")
.required(false)
.description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final String FIFTY_DASHES = "--------------------------------------------------";
public static enum DebugLevels {
@ -107,6 +116,7 @@ public class LogAttribute extends AbstractProcessor {
supDescriptors.add(LOG_PAYLOAD);
supDescriptors.add(ATTRIBUTES_TO_LOG_CSV);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
supDescriptors.add(LOG_PREFIX);
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
}
@ -123,12 +133,26 @@ public class LogAttribute extends AbstractProcessor {
protected String processFlowFile(final ProcessorLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
final ProcessorLog LOG = getLogger();
final String dashedLine;
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtil.isBlank(logPrefix)) {
dashedLine = StringUtils.repeat('-', 50);
} else {
// abbreviate long lines
logPrefix = StringUtils.abbreviate(logPrefix, 40);
// center the logPrefix and pad with dashes
logPrefix = StringUtils.center(logPrefix, 40, '-');
// five dashes on the left and right side, plus the dashed logPrefix
dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
}
// Pretty print metadata
final StringBuilder message = new StringBuilder();
message.append("logging for flow file ").append(flowFile);
message.append("\n");
message.append(FIFTY_DASHES);
message.append(dashedLine);
message.append("\nStandard FlowFile Attributes");
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
@ -138,7 +162,7 @@ public class LogAttribute extends AbstractProcessor {
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
}
message.append("\n");
message.append(FIFTY_DASHES);
message.append(dashedLine);
// The user can request to log the payload
final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();

View File

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -64,7 +65,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
/**
* This processor supports updating flowfile attributes and can do so
* conditionally or unconditionally. Like the FlowFileMetadataEnhancer, it can
* conditionally or unconditionally. It can also delete flowfile attributes
* that match a regular expression.
*
* Like the FlowFileMetadataEnhancer, it can
* be configured with an arbitrary number of optional properties to define how
* attributes should be updated. Each optional property represents an action
* that is applied to all incoming flow files. An action is comprised of an
@ -112,11 +116,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
*/
@EventDriven
@SideEffectFree
@Tags({"attributes", "modification", "update", "Attribute Expression Language"})
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language")
@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language"})
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression")
@DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true,
description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This processor may write zero or more attributes as described in additional details")
@WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details")
public class UpdateAttribute extends AbstractProcessor implements Searchable {
private final AtomicReference<Criteria> criteriaCache = new AtomicReference<>(null);
@ -124,6 +128,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
private final Set<Relationship> relationships;
// static properties
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Delete Attributes Expression")
.description("Regular expression for attributes to be deleted from flowfiles.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All FlowFiles are routed to this relationship").name("success").build();
@ -139,6 +151,13 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DELETE_ATTRIBUTES);
return Collections.unmodifiableList(descriptors);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@ -435,40 +454,65 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
// attribute values that will be applied to the flow file
final Map<String, String> attributes = new HashMap<>(actions.size());
final Map<String, String> attributesToUpdate = new HashMap<>(actions.size());
final Set<String> attributesToDelete = new HashSet<>(actions.size());
// go through each action
for (final Action action : actions.values()) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue();
if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue();
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
}
attributesToUpdate.put(action.getAttribute(), newAttributeValue);
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
}
} else {
try {
final String regex = action.getValue();
if (regex != null) {
Pattern pattern = Pattern.compile(regex);
final Set<String> attributeKeys = flowfile.getAttributes().keySet();
for (final String key : attributeKeys) {
if (pattern.matcher(key).matches()) {
attributes.put(action.getAttribute(), newAttributeValue);
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
// log if appropriate
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this,
key, flowfile, regex));
}
attributesToDelete.add(key);
}
}
}
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe);
}
}
}
// If the 'alternate.identifier' attribute is added, then we want to create an ADD_INFO provenance event.
final String alternateIdentifier = attributes.get(CoreAttributes.ALTERNATE_IDENTIFIER.key());
if (alternateIdentifier != null) {
final String alternateIdentifierAdd = attributesToUpdate.get(CoreAttributes.ALTERNATE_IDENTIFIER.key());
if (alternateIdentifierAdd != null) {
try {
final URI uri = new URI(alternateIdentifier);
final URI uri = new URI(alternateIdentifierAdd);
final String namespace = uri.getScheme();
if (namespace != null) {
final String identifier = alternateIdentifier.substring(Math.min(namespace.length() + 1, alternateIdentifier.length() - 1));
final String identifier = alternateIdentifierAdd.substring(Math.min(namespace.length() + 1, alternateIdentifierAdd.length() - 1));
session.getProvenanceReporter().associate(flowfile, namespace, identifier);
}
} catch (final URISyntaxException e) {
}
}
// update the flowfile attributes
return session.putAllAttributes(flowfile, attributes);
// update and delete the flowfile attributes
return session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
}
// Gets the default actions.

View File

@ -26,9 +26,10 @@
<h2>Description:</h2>
<p>
This processor updates the attributes of a FlowFile using properties or rules that are added by the user.
There are two ways to use this processor to add or modify attributes. One way is the "Basic Usage"; this allows you to set default attribute changes that affect
There are three ways to use this processor to add or modify attributes. One way is the "Basic Usage"; this allows you to set default attribute changes that affect
every FlowFile going through the processor. The second way is the "Advanced Usage"; this allows you to make conditional attribute changes that only affect a FlowFile if it
meets certain conditions. It is possible to use both methods in the same processor at the same time.
meets certain conditions. It is possible to use both methods in the same processor at the same time. The third way is the "Delete Attributes Expression"; this allows you to
provide a regular expression and any attributes with a matching name will be deleted.
</p>
<p>
@ -189,6 +190,26 @@
a type of "else" construct. In other words, if none of the rules match for the attribute, then the basic usage changes will be made.
</p>
<p><strong>Deleting Attributes</strong></p>
<p>
Deleting attributes is a simple as providing a regular expression for attribute names to be deleted. This can be a simple regular expression that will
match a single attribute or more complex regular expression to match a group of similarly named attributes or even seveal individual attribute names.
</p>
<ul>
<li><strong>lastUser</strong> - will delete an attribute with the name "lastUser".
</li>
<li><strong>user.*</strong> - will delete attributes beginning with "user", including for example "username, "userName", "userID", and "users". But
it will not delete "User" or "localuser".
</li>
<li><strong>(user.*|host.*|.*Date)</strong> - will delete "user", "username", "userName", "hostInfo", "hosts", and "updateDate", but not "User", "HOST", "update", or "updatedate".
</li>
</ul>
<p>
The delete attributes function does not produce a Provenance Event if the <strong>alternate.identified</strong> Attribute is deleted.
</p>
<p><strong>FlowFile Policy</strong>
</p>

View File

@ -16,11 +16,6 @@
*/
package org.apache.nifi.update.attributes;
import org.apache.nifi.update.attributes.FlowFilePolicy;
import org.apache.nifi.update.attributes.Criteria;
import org.apache.nifi.update.attributes.Condition;
import org.apache.nifi.update.attributes.Action;
import org.apache.nifi.update.attributes.Rule;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
@ -56,8 +51,7 @@ public class TestUpdateAttribute {
}
private Criteria getCriteria() {
final Criteria criteria = new Criteria();
return criteria;
return new Criteria();
}
private void addRule(final Criteria criteria, final String name, final Collection<String> conditions, final Map<String, String> actions) {
@ -205,8 +199,8 @@ public class TestUpdateAttribute {
addRule(criteria, "rule", Arrays.asList(
// conditions
"${attribute.1:equals('value.1')}"), getMap(
// actions
"attribute.2", "value.2"));
// actions
"attribute.2", "value.2"));
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setAnnotationData(serialize(criteria));
@ -230,8 +224,8 @@ public class TestUpdateAttribute {
addRule(criteria, "rule", Arrays.asList(
// conditions
"${attribute.1:equals('value.1')}"), getMap(
// actions
"attribute.2", "value.2"));
// actions
"attribute.2", "value.2"));
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setAnnotationData(serialize(criteria));
@ -358,8 +352,8 @@ public class TestUpdateAttribute {
addRule(criteria, "rule 3", Arrays.asList(
// conditions
"${attribute.1:equals('value.1')}"), getMap(
// actions
"attribute.2", "value.3"));
// actions
"attribute.2", "value.3"));
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setAnnotationData(serialize(criteria));
@ -422,4 +416,88 @@ public class TestUpdateAttribute {
// ensure the attributes are as expected
flowfile.assertAttributeEquals("default.attr", "-more-stuff");
}
@Test
public void testSimpleDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
attributes.put("attribute.2", "value.2");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(0).assertAttributeEquals("attribute.1", "value.1");
result.get(0).assertAttributeNotExists("attribute.2");
}
@Test
public void testRegexDotDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
attributes.put("attribute.2", "value.2");
attributes.put("attributex2", "valuex2");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(0).assertAttributeEquals("attribute.1", "value.1");
result.get(0).assertAttributeNotExists("attribute.2");
result.get(0).assertAttributeNotExists("attributex2");
}
@Test
public void testRegexLiteralDotDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "attribute\\.2");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
attributes.put("attribute.2", "value.2");
attributes.put("attributex2", "valuex2");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(0).assertAttributeEquals("attribute.1", "value.1");
result.get(0).assertAttributeNotExists("attribute.2");
result.get(0).assertAttributeExists("attributex2");
}
@Test
public void testRegexGroupDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
attributes.put("attribute.2", "value.2");
attributes.put("attribute.6", "value.6");
attributes.put("sampleSize", "value.size");
attributes.put("sample.1", "value.sample.1");
attributes.put("simple.1", "value.simple.1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(0).assertAttributeEquals("attribute.1", "value.1");
result.get(0).assertAttributeNotExists("attribute.2");
result.get(0).assertAttributeExists("attribute.6");
result.get(0).assertAttributeNotExists("sampleSize");
result.get(0).assertAttributeNotExists("sample.1");
result.get(0).assertAttributeExists("simple.1");
}
}