NIFI-1923 - AttributesToJson regex property

added EL support

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2099
This commit is contained in:
Pierre Villard 2017-08-18 12:19:11 +02:00 committed by Matthew Burgess
parent a53a37f9ca
commit e62417ea6b
2 changed files with 73 additions and 10 deletions

View File

@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Collections; import java.util.Collections;
@ -79,6 +80,19 @@ public class AttributesToJSON extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-json-regex")
.displayName("Attributes Regular Expression")
.description("Regular expression that will be evaluated against the flow file attributes to select "
+ "the matching attributes. This property can be used in combination with the attributes "
+ "list property.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination") .name("Destination")
.description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " + .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
@ -120,11 +134,13 @@ public class AttributesToJSON extends AbstractProcessor {
private volatile Set<String> attributes; private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString; private volatile Boolean nullValueForEmptyString;
private volatile boolean destinationContent; private volatile boolean destinationContent;
private volatile Pattern pattern;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST); properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION); properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES); properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING); properties.add(NULL_VALUE_FOR_EMPTY_STRING);
@ -153,17 +169,27 @@ public class AttributesToJSON extends AbstractProcessor {
* @return * @return
* Map of values that are feed to a Jackson ObjectMapper * Map of values that are feed to a Jackson ObjectMapper
*/ */
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove, boolean nullValForEmptyString) { protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove,
boolean nullValForEmptyString, Pattern attPattern) {
Map<String, String> result; Map<String, String> result;
//If list of attributes specified get only those attributes. Otherwise write them all //If list of attributes specified get only those attributes. Otherwise write them all
if (attributes != null) { if (attributes != null || attPattern != null) {
result = new HashMap<>(attributes.size()); result = new HashMap<>();
for (String attribute : attributes) { if(attributes != null) {
String val = ff.getAttribute(attribute); for (String attribute : attributes) {
if (val != null || nullValForEmptyString) { String val = ff.getAttribute(attribute);
result.put(attribute, val); if (val != null || nullValForEmptyString) {
} else { result.put(attribute, val);
result.put(attribute, ""); } else {
result.put(attribute, "");
}
}
}
if(attPattern != null) {
for (Map.Entry<String, String> e : ff.getAttributes().entrySet()) {
if(attPattern.matcher(e.getKey()).matches()) {
result.put(e.getKey(), e.getValue());
}
} }
} }
} else { } else {
@ -204,6 +230,9 @@ public class AttributesToJSON extends AbstractProcessor {
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove); attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
if(context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
}
} }
@Override @Override
@ -213,7 +242,7 @@ public class AttributesToJSON extends AbstractProcessor {
return; return;
} }
final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString); final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern);
try { try {
if (destinationContent) { if (destinationContent) {

View File

@ -388,4 +388,38 @@ public class TestAttributesToJSON {
Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k))); val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
} }
@Test
public void testAttributesRegex() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setVariable("regex", "delimited\\.header\\.column\\.[0-9]+");
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_REGEX, "${regex}");
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "test, test1");
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("delimited.header.column.1", "Registry");
attributes.put("delimited.header.column.2", "Assignment");
attributes.put("delimited.header.column.3", "Organization Name");
attributes.put("delimited.header.column.4", "Organization Address");
attributes.put("delimited.footer.column.1", "not included");
attributes.put("test", "test");
attributes.put("test1", "test1");
testRunner.enqueue("".getBytes(), attributes);
testRunner.run();
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0);
Map<String, String> val = new ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME), HashMap.class);
assertTrue(val.keySet().contains("delimited.header.column.1"));
assertTrue(val.keySet().contains("delimited.header.column.2"));
assertTrue(val.keySet().contains("delimited.header.column.3"));
assertTrue(val.keySet().contains("delimited.header.column.4"));
assertTrue(!val.keySet().contains("delimited.footer.column.1"));
assertTrue(val.keySet().contains("test"));
assertTrue(val.keySet().contains("test1"));
}
} }