diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index 7098b6e7ec..950b8d317c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -8,16 +8,22 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.*; @EventDriven @SideEffectFree @SupportsBatching -@Tags({"JSON", "attributes"}) +@Tags({"json", "attributes"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " + "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" + @@ -28,16 +34,48 @@ public class AttributesToJSON extends AbstractProcessor { public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute"; private static final String AT_LIST_SEPARATOR = ","; - private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = ""; + + public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() .name("Attributes List") - .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" + + .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " + + "attribute. This list of attributes is case sensitive. If a specified attribute is not found" + "in the flowfile an empty string will be output for that attritbute in the resulting JSON") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " + + "or written in the flowfile content. " + + "Writing to flowfile content will overwrite any existing flowfile content.") + .required(true) + .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT) + .defaultValue(DESTINATION_ATTRIBUTE) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " + + "contained in every FlowFile should be included in the final JSON value generated.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name("Null Value") + .description("If an Attribute is value is empty or not present this property determines if an empty string" + + "or true NULL value is present in the resulting JSON output") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build(); @@ -52,6 +90,9 @@ public class AttributesToJSON extends AbstractProcessor { protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); properties.add(ATTRIBUTES_LIST); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -70,6 +111,64 @@ public class AttributesToJSON extends AbstractProcessor { return relationships; } + + /** + * Builds the Map of attributes that should be included in the JSON that is emitted from this process. + * + * @return + * Map of values that are feed to a Jackson ObjectMapper + */ + protected Map buildAttributesMapForFlowFile(FlowFile ff, String atrList, + boolean includeCoreAttributes, + boolean nullValForEmptyString) { + + Map atsToWrite = new HashMap<>(); + + //If list of attributes specified get only those attributes. Otherwise write them all + if (StringUtils.isNotBlank(atrList)) { + String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR); + if (ats != null) { + for (String str : ats) { + String cleanStr = str.trim(); + String val = ff.getAttribute(cleanStr); + if (val != null) { + atsToWrite.put(cleanStr, val); + } else { + if (nullValForEmptyString) { + atsToWrite.put(cleanStr, null); + } else { + atsToWrite.put(cleanStr, ""); + } + } + } + } + } else { + atsToWrite.putAll(ff.getAttributes()); + } + + if (!includeCoreAttributes) { + atsToWrite = removeCoreAttributes(atsToWrite); + } + + return atsToWrite; + } + + /** + * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile. + * + * @param atsToWrite + * List of Attributes that have already been generated including the CoreAttributes + * + * @return + * Difference of all attributes minus the CoreAttributes + */ + protected Map removeCoreAttributes(Map atsToWrite) { + for (CoreAttributes c : CoreAttributes.values()) { + atsToWrite.remove(c.key()); + } + return atsToWrite; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile original = session.get(); @@ -77,41 +176,35 @@ public class AttributesToJSON extends AbstractProcessor { return; } - final String atList = context.getProperty(ATTRIBUTES_LIST).getValue(); - Map atsToWrite = null; + final Map atrList = buildAttributesMapForFlowFile(original, + context.getProperty(ATTRIBUTES_LIST).getValue(), + context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(), + context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean()); - //If list of attributes specified get only those attributes. Otherwise write them all - if (atList != null && !StringUtils.isEmpty(atList)) { - atsToWrite = new HashMap<>(); - String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR); - if (ats != null) { - for (String str : ats) { - String cleanStr = str.trim(); - String val = original.getAttribute(cleanStr); - if (val != null) { - atsToWrite.put(cleanStr, val); - } else { - atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT); - } - } + try { + + switch (context.getProperty(DESTINATION).getValue()) { + case DESTINATION_ATTRIBUTE: + FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, + objectMapper.writeValueAsString(atrList)); + session.transfer(atFlowfile, REL_SUCCESS); + break; + case DESTINATION_CONTENT: + FlowFile conFlowfile = session.write(original, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(objectMapper.writeValueAsBytes(atrList)); + } + } + }); + session.transfer(conFlowfile, REL_SUCCESS); + break; } - } else { - atsToWrite = original.getAttributes(); - } - if (atsToWrite != null) { - if (atsToWrite.size() == 0) { - getLogger().debug("Flowfile contains no attributes to convert to JSON"); - } else { - try { - FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite)); - session.transfer(updated, REL_SUCCESS); - } catch (JsonProcessingException e) { - getLogger().error(e.getMessage()); - session.transfer(original, REL_FAILURE); - } - } + } catch (JsonProcessingException e) { + getLogger().error(e.getMessage()); + session.transfer(original, REL_FAILURE); } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index a057d15de7..1624c4b267 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -6,28 +6,17 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestAttributesToJSON { - private static Logger LOGGER; - - static { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug"); - LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class); - } - private static final String TEST_ATTRIBUTE_KEY = "TestAttribute"; private static final String TEST_ATTRIBUTE_VALUE = "TestValue"; @@ -45,9 +34,84 @@ public class TestAttributesToJSON { testRunner.run(); } + @Test(expected = AssertionError.class) + public void testInvalidIncludeCoreAttributesProperty() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "val1,val2"); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "maybe"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + } + + @Test + public void testNullValueForEmptyAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey"; + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "true"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + //Expecting success transition because Jackson is taking care of escaping the bad JSON characters + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + //Make sure that the value is a true JSON null for the non existing attribute + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, HashMap.class); + + assertNull(val.get(NON_PRESENT_ATTRIBUTE_KEY)); + } + + @Test + public void testEmptyStringValueForEmptyAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); + final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey"; + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + //Expecting success transition because Jackson is taking care of escaping the bad JSON characters + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + //Make sure that the value is a true JSON null for the non existing attribute + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, HashMap.class); + + assertEquals(val.get(NON_PRESENT_ATTRIBUTE_KEY), ""); + } + @Test public void testInvalidJSONValueInAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -67,8 +131,9 @@ public class TestAttributesToJSON { @Test - public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception { + public void testAttributes_emptyListUserSpecifiedAttributes() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -92,10 +157,31 @@ public class TestAttributesToJSON { } + @Test + public void testContent_emptyListUserSpecifiedAttributes() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0). + assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}"); + } + + @Test public void testAttribute_singleUserDefinedAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -123,6 +209,7 @@ public class TestAttributesToJSON { public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " "); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create(); @@ -150,6 +237,7 @@ public class TestAttributesToJSON { public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute"); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); FlowFile ff = session.create();