mirror of https://github.com/apache/nifi.git
NIFI-1123 Adds expression language support to DeleteAttributesExpression on UpdateAttributes Processor.
Reviewed by Tony Kurc (trkurc@gmail.com) after Aldrin Piri <aldrin@apache.org> did the initial review and actionable comments
This commit is contained in:
parent
ab7940368a
commit
52b24b93d9
|
@ -45,6 +45,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
import org.apache.nifi.expression.AttributeExpression;
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
@ -131,12 +132,40 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
||||||
|
|
||||||
private final Set<Relationship> relationships;
|
private final Set<Relationship> relationships;
|
||||||
|
|
||||||
|
private static final Validator DELETE_PROPERTY_VALIDATOR = new Validator() {
|
||||||
|
private static final Validator DPV_RE_VALIDATOR = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||||
|
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
|
||||||
|
final AttributeExpression.ResultType resultType = context.newExpressionLanguageCompiler().getResultType(input);
|
||||||
|
if (!resultType.equals(AttributeExpression.ResultType.STRING)) {
|
||||||
|
return new ValidationResult.Builder()
|
||||||
|
.subject(subject)
|
||||||
|
.input(input)
|
||||||
|
.valid(false)
|
||||||
|
.explanation("Expected property to to return type " + AttributeExpression.ResultType.STRING +
|
||||||
|
" but expression returns type " + resultType)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
return new ValidationResult.Builder()
|
||||||
|
.subject(subject)
|
||||||
|
.input(input)
|
||||||
|
.valid(true)
|
||||||
|
.explanation("Property returns type " + AttributeExpression.ResultType.STRING)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
return DPV_RE_VALIDATOR.validate(subject, input, context);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// static properties
|
// static properties
|
||||||
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
|
||||||
.name("Delete Attributes Expression")
|
.name("Delete Attributes Expression")
|
||||||
.description("Regular expression for attributes to be deleted from flowfiles.")
|
.description("Regular expression for attributes to be deleted from flowfiles.")
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
.addValidator(DELETE_PROPERTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// relationships
|
// relationships
|
||||||
|
@ -477,7 +506,9 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
final String regex = action.getValue();
|
final String actionValue = action.getValue();
|
||||||
|
final String regex = (actionValue == null) ? null :
|
||||||
|
getPropertyValue(actionValue, context).evaluateAttributeExpressions(flowfile).getValue();
|
||||||
if (regex != null) {
|
if (regex != null) {
|
||||||
Pattern pattern = Pattern.compile(regex);
|
Pattern pattern = Pattern.compile(regex);
|
||||||
final Set<String> attributeKeys = flowfile.getAttributes().keySet();
|
final Set<String> attributeKeys = flowfile.getAttributes().keySet();
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.regex.PatternSyntaxException;
|
||||||
|
|
||||||
import org.apache.nifi.processors.attributes.UpdateAttribute;
|
import org.apache.nifi.processors.attributes.UpdateAttribute;
|
||||||
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
|
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
|
||||||
|
@ -33,6 +34,8 @@ import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -500,4 +503,131 @@ public class TestUpdateAttribute {
|
||||||
result.get(0).assertAttributeNotExists("sample.1");
|
result.get(0).assertAttributeNotExists("sample.1");
|
||||||
result.get(0).assertAttributeExists("simple.1");
|
result.get(0).assertAttributeExists("simple.1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttributeKey() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("attribute.1", "value.1");
|
||||||
|
attributes.put("attribute.2", "value.2");
|
||||||
|
attributes.put("attribute.6", "value.6");
|
||||||
|
attributes.put("sampleSize", "value.size");
|
||||||
|
attributes.put("sample.1", "value.sample.1");
|
||||||
|
attributes.put("simple.1", "value.simple.1");
|
||||||
|
runner.enqueue(new byte[0], attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
|
||||||
|
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
|
||||||
|
result.get(0).assertAttributeEquals("attribute.1", "value.1");
|
||||||
|
result.get(0).assertAttributeNotExists("attribute.2");
|
||||||
|
result.get(0).assertAttributeExists("attribute.6");
|
||||||
|
result.get(0).assertAttributeNotExists("sampleSize");
|
||||||
|
result.get(0).assertAttributeNotExists("sample.1");
|
||||||
|
result.get(0).assertAttributeExists("simple.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionLiteralDelete() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "${literal('attribute\\.'):append(${literal(6)})}");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("attribute.1", "value.1");
|
||||||
|
attributes.put("attribute.2", "value.2");
|
||||||
|
attributes.put("attribute.6", "value.6");
|
||||||
|
attributes.put("sampleSize", "value.size");
|
||||||
|
attributes.put("sample.1", "value.sample.1");
|
||||||
|
attributes.put("simple.1", "value.simple.1");
|
||||||
|
runner.enqueue(new byte[0], attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
|
||||||
|
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
|
||||||
|
result.get(0).assertAttributeEquals("attribute.1", "value.1");
|
||||||
|
result.get(0).assertAttributeExists("attribute.2");
|
||||||
|
result.get(0).assertAttributeNotExists("attribute.6");
|
||||||
|
result.get(0).assertAttributeExists("sampleSize");
|
||||||
|
result.get(0).assertAttributeExists("sample.1");
|
||||||
|
result.get(0).assertAttributeExists("simple.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionRegexDelete() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "${literal('(attribute\\.'):append(${literal('[2-5]')}):append(${literal('|sample.*)')})}");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("attribute.1", "value.1");
|
||||||
|
attributes.put("attribute.2", "value.2");
|
||||||
|
attributes.put("attribute.6", "value.6");
|
||||||
|
attributes.put("sampleSize", "value.size");
|
||||||
|
attributes.put("sample.1", "value.sample.1");
|
||||||
|
attributes.put("simple.1", "value.simple.1");
|
||||||
|
runner.enqueue(new byte[0], attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
|
||||||
|
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
|
||||||
|
result.get(0).assertAttributeEquals("attribute.1", "value.1");
|
||||||
|
result.get(0).assertAttributeNotExists("attribute.2");
|
||||||
|
result.get(0).assertAttributeExists("attribute.6");
|
||||||
|
result.get(0).assertAttributeNotExists("sampleSize");
|
||||||
|
result.get(0).assertAttributeNotExists("sample.1");
|
||||||
|
result.get(0).assertAttributeExists("simple.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttributeListDelete() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "attribute.1|attribute.2|sample.1|simple.1");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("attribute.1", "value.1");
|
||||||
|
attributes.put("attribute.2", "value.2");
|
||||||
|
attributes.put("attribute.6", "value.6");
|
||||||
|
attributes.put("sampleSize", "value.size");
|
||||||
|
attributes.put("sample.1", "value.sample.1");
|
||||||
|
attributes.put("simple.1", "value.simple.1");
|
||||||
|
runner.enqueue(new byte[0], attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
|
||||||
|
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
|
||||||
|
result.get(0).assertAttributeNotExists("attribute.1");
|
||||||
|
result.get(0).assertAttributeNotExists("attribute.2");
|
||||||
|
result.get(0).assertAttributeExists("attribute.6");
|
||||||
|
result.get(0).assertAttributeExists("sampleSize");
|
||||||
|
result.get(0).assertAttributeNotExists("sample.1");
|
||||||
|
result.get(0).assertAttributeNotExists("simple.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidRegex() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "(");
|
||||||
|
runner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidRegexInAttribute() {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
|
||||||
|
runner.setProperty("Delete Attributes Expression", "${butter}");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("butter", "(");
|
||||||
|
runner.enqueue(new byte[0], attributes);
|
||||||
|
try {
|
||||||
|
runner.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
assertEquals(t.getCause().getClass(), PatternSyntaxException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue