NIFI-2964 Added JSON Handling Strategy property to AttributesToJSON

This closes #7231

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2023-05-05 16:39:36 +00:00 committed by exceptionfactory
parent a446af8aaa
commit d6600e67cb
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 194 additions and 17 deletions

View File

@ -21,36 +21,37 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.HashSet;
import java.util.Map;
import java.util.Collections;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@EventDriven @EventDriven
@ -59,9 +60,37 @@ import java.util.stream.Collectors;
@Tags({"json", "attributes", "flowfile"}) @Tags({"json", "attributes", "flowfile"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " + @CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " +
"can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content.") "can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content. Attributes " +
" which contain nested JSON objects can either be handled as JSON or as escaped JSON depending on the strategy chosen.")
@WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes") @WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes")
public class AttributesToJSON extends AbstractProcessor { public class AttributesToJSON extends AbstractProcessor {
public enum JsonHandlingStrategy implements DescribedValue {
ESCAPED("Escaped", "Escapes JSON attribute values to strings"),
NESTED("Nested", "Handles JSON attribute values as nested structured objects or arrays");
JsonHandlingStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
private final String displayName;
private final String description;
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}
public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes"; public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes";
private static final String AT_LIST_SEPARATOR = ","; private static final String AT_LIST_SEPARATOR = ",";
@ -122,6 +151,15 @@ public class AttributesToJSON extends AbstractProcessor {
.defaultValue("false") .defaultValue("false")
.build(); .build();
public static final PropertyDescriptor JSON_HANDLING_STRATEGY = new PropertyDescriptor.Builder()
.name("JSON Handling Strategy")
.displayName("JSON Handling Strategy")
.description("Strategy to use for handling attributes which contain nested JSON.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(JsonHandlingStrategy.class)
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build(); .description("Successfully converted attributes to JSON").build();
@ -136,6 +174,7 @@ public class AttributesToJSON extends AbstractProcessor {
private volatile Boolean nullValueForEmptyString; private volatile Boolean nullValueForEmptyString;
private volatile boolean destinationContent; private volatile boolean destinationContent;
private volatile Pattern pattern; private volatile Pattern pattern;
private volatile JsonHandlingStrategy jsonHandlingStrategy;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -145,6 +184,7 @@ public class AttributesToJSON extends AbstractProcessor {
properties.add(DESTINATION); properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES); properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING); properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(JSON_HANDLING_STRATEGY);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
@ -163,19 +203,18 @@ public class AttributesToJSON extends AbstractProcessor {
return relationships; return relationships;
} }
/** /**
* Builds the Map of attributes that should be included in the JSON that is emitted from this process. * Builds the Map of attributes that should be included in the JSON that is emitted from this process.
* *
* @return * @return
* Map of values that are feed to a Jackson ObjectMapper * Map of values that are feed to a Jackson ObjectMapper
*/ */
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove, protected Map<String, Object> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove,
boolean nullValForEmptyString, Pattern attPattern) { boolean nullValForEmptyString, Pattern attPattern) {
Map<String, String> result; Map<String, Object> result;
//If list of attributes specified get only those attributes. Otherwise write them all //If list of attributes specified get only those attributes. Otherwise write them all
if (attributes != null || attPattern != null) { if (attributes != null || attPattern != null) {
result = new HashMap<>(); result = new LinkedHashMap<>();
if(attributes != null) { if(attributes != null) {
for (String attribute : attributes) { for (String attribute : attributes) {
String val = ff.getAttribute(attribute); String val = ff.getAttribute(attribute);
@ -195,7 +234,7 @@ public class AttributesToJSON extends AbstractProcessor {
} }
} else { } else {
Map<String, String> ffAttributes = ff.getAttributes(); Map<String, String> ffAttributes = ff.getAttributes();
result = new HashMap<>(ffAttributes.size()); result = new LinkedHashMap<>(ffAttributes.size());
for (Map.Entry<String, String> e : ffAttributes.entrySet()) { for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
if (!attributesToRemove.contains(e.getKey())) { if (!attributesToRemove.contains(e.getKey())) {
result.put(e.getKey(), e.getValue()); result.put(e.getKey(), e.getValue());
@ -228,6 +267,8 @@ public class AttributesToJSON extends AbstractProcessor {
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue()); attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue());
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue());
if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { if(context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue()); pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
} }
@ -240,19 +281,20 @@ public class AttributesToJSON extends AbstractProcessor {
return; return;
} }
final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern); final Map<String, Object> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern);
try { try {
Map<String, Object> formattedAttributes = getFormattedAttributes(atrList);
if (destinationContent) { if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> { FlowFile conFlowfile = session.write(original, (in, out) -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) { try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(atrList)); outputStream.write(objectMapper.writeValueAsBytes(formattedAttributes));
} }
}); });
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.transfer(conFlowfile, REL_SUCCESS); session.transfer(conFlowfile, REL_SUCCESS);
} else { } else {
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList)); FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(formattedAttributes));
session.transfer(atFlowfile, REL_SUCCESS); session.transfer(atFlowfile, REL_SUCCESS);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
@ -260,4 +302,30 @@ public class AttributesToJSON extends AbstractProcessor {
session.transfer(original, REL_FAILURE); session.transfer(original, REL_FAILURE);
} }
} }
private Map<String, Object> getFormattedAttributes(Map<String, Object> flowFileAttributes) throws JsonProcessingException {
if (JsonHandlingStrategy.ESCAPED == jsonHandlingStrategy) {
return flowFileAttributes;
}
Map<String, Object> formattedAttributes = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : flowFileAttributes.entrySet()) {
String value = (String) entry.getValue();
if (StringUtils.isNotBlank(value) && (isPossibleJsonArray(value) || isPossibleJsonObject(value))) {
formattedAttributes.put(entry.getKey(), objectMapper.readTree(value));
} else {
formattedAttributes.put(entry.getKey(), value);
}
}
return formattedAttributes;
}
private boolean isPossibleJsonArray(String value) {
return value.startsWith("[") && value.endsWith("]");
}
private boolean isPossibleJsonObject(String value) {
return value.startsWith("{") && value.endsWith("}");
}
} }

View File

@ -21,20 +21,29 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -455,4 +464,104 @@ public class TestAttributesToJSON {
assertTrue(val.keySet().contains("test")); assertTrue(val.keySet().contains("test"));
assertTrue(val.keySet().contains("test1")); assertTrue(val.keySet().contains("test1"));
} }
@ParameterizedTest
@MethodSource("getNestedJson")
public void testAttributeWithNestedJsonOutputAsJsonInContent(String nestedJson, Class<?> expectedClass) throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY,
AttributesToJSON.JsonHandlingStrategy.NESTED.getValue());
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson);
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
MockFlowFile flowFile = flowFilesForRelationship.get(0);
assertEquals(AttributesToJSON.APPLICATION_JSON, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
Map<String, Object> val = new ObjectMapper().readValue(flowFile.toByteArray(), Map.class);
assertInstanceOf(expectedClass, val.get(TEST_ATTRIBUTE_KEY));
}
private static Stream<Arguments> getNestedJson() throws IOException {
return Stream.of(
Arguments.of(Files.readString(Paths.get("src/test/resources/TestJson/json-sample.json")), List.class),
Arguments.of(Files.readString(Paths.get("src/test/resources/TestJoltTransformJson/input.json")), Map.class));
}
@ParameterizedTest
@MethodSource("getNestedJson")
public void testAttributeWithNestedJsonOutputAsJsonInAttribute(String nestedJson, Class<?> expectedClass) throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY,
AttributesToJSON.JsonHandlingStrategy.NESTED.getValue());
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson);
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
.assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> val = mapper.readValue(json, Map.class);
assertInstanceOf(expectedClass, val.get(TEST_ATTRIBUTE_KEY));
}
@ParameterizedTest
@ValueSource(strings = {"[THIS IS NOT JSON]", "{THIS IS NOT JSON}"})
public void testAttributesWithLookALikeJson(String lookAlikeJson) {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY,
AttributesToJSON.JsonHandlingStrategy.NESTED.getValue());
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, lookAlikeJson);
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 0);
MockComponentLog logger = testRunner.getLogger();
assertTrue(logger.getErrorMessages().get(0).getMsg().contains("expecting"));
}
@ParameterizedTest
@MethodSource("getNestedJson")
public void testAttributeWithNestedJsonOutputAsStringInAttribute(String nestedJson) throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson);
testRunner.enqueue(ff);
testRunner.run();
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
.assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> val = mapper.readValue(json, Map.class);
assertInstanceOf(String.class, val.get(TEST_ATTRIBUTE_KEY));
}
} }