From 217b1049cf73c8ecef7dd76a954f07a7c7224cc6 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 28 Oct 2015 20:53:46 -0400 Subject: [PATCH] NIFI-1079 Added Destination Property to control if JSON goes to Attribute or Content of FlowFile. Added Include Core Attributes Property to control if FlowFile CoreAttributes are included in the JSON output or not. Added Null value for Empty String flag to control if empty values in the JSON are empty string or true NULL values. Added more tests and minor text refactoring per Github comments Signed-off-by: Bryan Bende --- .../processors/standard/AttributesToJSON.java | 163 ++++++++++++++---- .../standard/TestAttributesToJSON.java | 116 +++++++++++-- 2 files changed, 230 insertions(+), 49 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index 7098b6e7ec..950b8d317c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -8,16 +8,22 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.*; @EventDriven @SideEffectFree @SupportsBatching -@Tags({"JSON", "attributes"}) +@Tags({"json", "attributes"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " + "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" + @@ -28,16 +34,48 @@ public class AttributesToJSON extends AbstractProcessor { public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute"; private static final String AT_LIST_SEPARATOR = ","; - private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = ""; + + public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() .name("Attributes List") - .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" + + .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " + + "attribute. This list of attributes is case sensitive. If a specified attribute is not found" + "in the flowfile an empty string will be output for that attritbute in the resulting JSON") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " + + "or written in the flowfile content. " + + "Writing to flowfile content will overwrite any existing flowfile content.") + .required(true) + .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT) + .defaultValue(DESTINATION_ATTRIBUTE) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " + + "contained in every FlowFile should be included in the final JSON value generated.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name("Null Value") + .description("If an Attribute is value is empty or not present this property determines if an empty string" + + "or true NULL value is present in the resulting JSON output") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build(); @@ -52,6 +90,9 @@ public class AttributesToJSON extends AbstractProcessor { protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); properties.add(ATTRIBUTES_LIST); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -70,6 +111,64 @@ public class AttributesToJSON extends AbstractProcessor { return relationships; } + + /** + * Builds the Map of attributes that should be included in the JSON that is emitted from this process. + * + * @return + * Map of values that are feed to a Jackson ObjectMapper + */ + protected Map buildAttributesMapForFlowFile(FlowFile ff, String atrList, + boolean includeCoreAttributes, + boolean nullValForEmptyString) { + + Map atsToWrite = new HashMap<>(); + + //If list of attributes specified get only those attributes. Otherwise write them all + if (StringUtils.isNotBlank(atrList)) { + String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR); + if (ats != null) { + for (String str : ats) { + String cleanStr = str.trim(); + String val = ff.getAttribute(cleanStr); + if (val != null) { + atsToWrite.put(cleanStr, val); + } else { + if (nullValForEmptyString) { + atsToWrite.put(cleanStr, null); + } else { + atsToWrite.put(cleanStr, ""); + } + } + } + } + } else { + atsToWrite.putAll(ff.getAttributes()); + } + + if (!includeCoreAttributes) { + atsToWrite = removeCoreAttributes(atsToWrite); + } + + return atsToWrite; + } + + /** + * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile. + * + * @param atsToWrite + * List of Attributes that have already been generated including the CoreAttributes + * + * @return + * Difference of all attributes minus the CoreAttributes + */ + protected Map removeCoreAttributes(Map atsToWrite) { + for (CoreAttributes c : CoreAttributes.values()) { + atsToWrite.remove(c.key()); + } + return atsToWrite; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile original = session.get(); @@ -77,41 +176,35 @@ public class AttributesToJSON extends AbstractProcessor { return; } - final String atList = context.getProperty(ATTRIBUTES_LIST).getValue(); - Map atsToWrite = null; + final Map atrList = buildAttributesMapForFlowFile(original, + context.getProperty(ATTRIBUTES_LIST).getValue(), + context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(), + context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean()); - //If list of attributes specified get only those attributes. Otherwise write them all - if (atList != null && !StringUtils.isEmpty(atList)) { - atsToWrite = new HashMap<>(); - String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR); - if (ats != null) { - for (String str : ats) { - String cleanStr = str.trim(); - String val = original.getAttribute(cleanStr); - if (val != null) { - atsToWrite.put(cleanStr, val); - } else { - atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT); - } - } + try { + + switch (context.getProperty(DESTINATION).getValue()) { + case DESTINATION_ATTRIBUTE: + FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, + objectMapper.writeValueAsString(atrList)); + session.transfer(atFlowfile, REL_SUCCESS); + break; + case DESTINATION_CONTENT: + FlowFile conFlowfile = session.write(original, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(objectMapper.writeValueAsBytes(atrList)); + } + } + }); + session.transfer(conFlowfile, REL_SUCCESS); + break; } - } else { - atsToWrite = original.getAttributes(); - } - if (atsToWrite != null) { - if (atsToWrite.size() == 0) { - getLogger().debug("Flowfile contains no attributes to convert to JSON"); - } else { - try { - FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite)); - session.transfer(updated, REL_SUCCESS); - } catch (JsonProcessingException e) { - getLogger().error(e.getMessage()); - session.transfer(original, REL_FAILURE); - } - } + } catch (JsonProcessingException e) { + getLogger().error(e.getMessage()); + session.transfer(original, REL_FAILURE); } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index a057d15de7..1624c4b267 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -6,28 +6,17 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestAttributesToJSON { - private static Logger LOGGER; - - static { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug"); - LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class); - } - private static final String TEST_ATTRIBUTE_KEY = "TestAttribute"; private static final String TEST_ATTRIBUTE_VALUE = "TestValue"; @@ -45,9 +34,84 @@ public class TestAttributesToJSON { testRunner.run(); } + @Test(expected = AssertionError.class) + public void testInvalidIncludeCoreAttributesProperty() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "val1,val2"); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "maybe"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + } + + @Test + public void testNullValueForEmptyAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey"; + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "true"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + //Expecting success transition because Jackson is taking care of escaping the bad JSON characters + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + //Make sure that the value is a true JSON null for the non existing attribute + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, HashMap.class); + + assertNull(val.get(NON_PRESENT_ATTRIBUTE_KEY)); + } + + @Test + public void testEmptyStringValueForEmptyAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey"; + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + //Expecting success transition because Jackson is taking care of escaping the bad JSON characters + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + //Make sure that the value is a true JSON null for the non existing attribute + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, HashMap.class); + + assertEquals(val.get(NON_PRESENT_ATTRIBUTE_KEY), ""); + } + @Test public void testInvalidJSONValueInAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -67,8 +131,9 @@ public class TestAttributesToJSON { @Test - public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception { + public void testAttributes_emptyListUserSpecifiedAttributes() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -92,10 +157,31 @@ public class TestAttributesToJSON { } + @Test + public void testContent_emptyListUserSpecifiedAttributes() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}"); + } + + @Test public void testAttribute_singleUserDefinedAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -123,6 +209,7 @@ public class TestAttributesToJSON { public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " "); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -150,6 +237,7 @@ public class TestAttributesToJSON { public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute"); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create();