diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index b9af5c1c5d..91a5448380 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -21,36 +21,37 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.DescribedValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import java.io.BufferedOutputStream; import java.io.OutputStream; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import java.util.HashSet; -import java.util.Map; -import java.util.Collections; import java.util.stream.Collectors; @EventDriven @@ -59,9 +60,37 @@ import java.util.stream.Collectors; @Tags({"json", "attributes", "flowfile"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " + - "can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content.") + "can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content. Attributes " + + " which contain nested JSON objects can either be handled as JSON or as escaped JSON depending on the strategy chosen.") @WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes") public class AttributesToJSON extends AbstractProcessor { + public enum JsonHandlingStrategy implements DescribedValue { + ESCAPED("Escaped", "Escapes JSON attribute values to strings"), + NESTED("Nested", "Handles JSON attribute values as nested structured objects or arrays"); + + JsonHandlingStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + private final String displayName; + private final String description; + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + } public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes"; private static final String AT_LIST_SEPARATOR = ","; @@ -122,6 +151,15 @@ public class AttributesToJSON extends AbstractProcessor { .defaultValue("false") .build(); + public static final PropertyDescriptor JSON_HANDLING_STRATEGY = new PropertyDescriptor.Builder() + .name("JSON Handling Strategy") + .displayName("JSON Handling Strategy") + .description("Strategy to use for handling attributes which contain nested JSON.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(JsonHandlingStrategy.class) + .defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue()) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("Successfully converted attributes to JSON").build(); @@ -136,6 +174,7 @@ public class AttributesToJSON extends AbstractProcessor { private volatile Boolean nullValueForEmptyString; private volatile boolean destinationContent; private volatile Pattern pattern; + private volatile JsonHandlingStrategy jsonHandlingStrategy; @Override protected void init(final ProcessorInitializationContext context) { @@ -145,6 +184,7 @@ public class AttributesToJSON extends AbstractProcessor { properties.add(DESTINATION); properties.add(INCLUDE_CORE_ATTRIBUTES); properties.add(NULL_VALUE_FOR_EMPTY_STRING); + properties.add(JSON_HANDLING_STRATEGY); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -163,19 +203,18 @@ public class AttributesToJSON extends AbstractProcessor { return relationships; } - /** * Builds the Map of attributes that should be included in the JSON that is emitted from this process. * * @return * Map of values that are feed to a Jackson ObjectMapper */ - protected Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Set attributesToRemove, + protected Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Set attributesToRemove, boolean nullValForEmptyString, Pattern attPattern) { - Map result; + Map result; //If list of attributes specified get only those attributes. Otherwise write them all if (attributes != null || attPattern != null) { - result = new HashMap<>(); + result = new LinkedHashMap<>(); if(attributes != null) { for (String attribute : attributes) { String val = ff.getAttribute(attribute); @@ -195,7 +234,7 @@ public class AttributesToJSON extends AbstractProcessor { } } else { Map ffAttributes = ff.getAttributes(); - result = new HashMap<>(ffAttributes.size()); + result = new LinkedHashMap<>(ffAttributes.size()); for (Map.Entry e : ffAttributes.entrySet()) { if (!attributesToRemove.contains(e.getKey())) { result.put(e.getKey(), e.getValue()); @@ -228,6 +267,8 @@ public class AttributesToJSON extends AbstractProcessor { attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue()); nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); + jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue()); + if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue()); } @@ -240,19 +281,20 @@ public class AttributesToJSON extends AbstractProcessor { return; } - final Map atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern); + final Map atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern); try { + Map formattedAttributes = getFormattedAttributes(atrList); if (destinationContent) { FlowFile conFlowfile = session.write(original, (in, out) -> { try (OutputStream outputStream = new BufferedOutputStream(out)) { - outputStream.write(objectMapper.writeValueAsBytes(atrList)); + outputStream.write(objectMapper.writeValueAsBytes(formattedAttributes)); } }); conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); session.transfer(conFlowfile, REL_SUCCESS); } else { - FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList)); + FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(formattedAttributes)); session.transfer(atFlowfile, REL_SUCCESS); } } catch (JsonProcessingException e) { @@ -260,4 +302,30 @@ public class AttributesToJSON extends AbstractProcessor { session.transfer(original, REL_FAILURE); } } + + private Map getFormattedAttributes(Map flowFileAttributes) throws JsonProcessingException { + if (JsonHandlingStrategy.ESCAPED == jsonHandlingStrategy) { + return flowFileAttributes; + } + + Map formattedAttributes = new LinkedHashMap<>(); + for (Map.Entry entry : flowFileAttributes.entrySet()) { + String value = (String) entry.getValue(); + if (StringUtils.isNotBlank(value) && (isPossibleJsonArray(value) || isPossibleJsonObject(value))) { + formattedAttributes.put(entry.getKey(), objectMapper.readTree(value)); + } else { + formattedAttributes.put(entry.getKey(), value); + } + } + + return formattedAttributes; + } + + private boolean isPossibleJsonArray(String value) { + return value.startsWith("[") && value.endsWith("]"); + } + + private boolean isPossibleJsonObject(String value) { + return value.startsWith("{") && value.endsWith("}"); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index b6de6ec698..c654cb908d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -21,20 +21,29 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -455,4 +464,104 @@ public class TestAttributesToJSON { assertTrue(val.keySet().contains("test")); assertTrue(val.keySet().contains("test1")); } + + @ParameterizedTest + @MethodSource("getNestedJson") + public void testAttributeWithNestedJsonOutputAsJsonInContent(String nestedJson, Class expectedClass) throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY, + AttributesToJSON.JsonHandlingStrategy.NESTED.getValue()); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson); + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS); + MockFlowFile flowFile = flowFilesForRelationship.get(0); + assertEquals(AttributesToJSON.APPLICATION_JSON, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + Map val = new ObjectMapper().readValue(flowFile.toByteArray(), Map.class); + assertInstanceOf(expectedClass, val.get(TEST_ATTRIBUTE_KEY)); + } + + private static Stream getNestedJson() throws IOException { + return Stream.of( + Arguments.of(Files.readString(Paths.get("src/test/resources/TestJson/json-sample.json")), List.class), + Arguments.of(Files.readString(Paths.get("src/test/resources/TestJoltTransformJson/input.json")), Map.class)); + } + + @ParameterizedTest + @MethodSource("getNestedJson") + public void testAttributeWithNestedJsonOutputAsJsonInAttribute(String nestedJson, Class expectedClass) throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY, + AttributesToJSON.JsonHandlingStrategy.NESTED.getValue()); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson); + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0) + .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, Map.class); + assertInstanceOf(expectedClass, val.get(TEST_ATTRIBUTE_KEY)); + } + + @ParameterizedTest + @ValueSource(strings = {"[THIS IS NOT JSON]", "{THIS IS NOT JSON}"}) + public void testAttributesWithLookALikeJson(String lookAlikeJson) { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + testRunner.setProperty(AttributesToJSON.JSON_HANDLING_STRATEGY, + AttributesToJSON.JsonHandlingStrategy.NESTED.getValue()); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, lookAlikeJson); + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 0); + MockComponentLog logger = testRunner.getLogger(); + assertTrue(logger.getErrorMessages().get(0).getMsg().contains("expecting")); + } + + @ParameterizedTest + @MethodSource("getNestedJson") + public void testAttributeWithNestedJsonOutputAsStringInAttribute(String nestedJson) throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, nestedJson); + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0) + .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, Map.class); + assertInstanceOf(String.class, val.get(TEST_ATTRIBUTE_KEY)); + } }