diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index 78b8d5843b..cfa4cfe00f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -36,20 +37,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.BufferedOutputStream; import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; import java.util.Map; -import java.util.HashMap; import java.util.Collections; +import java.util.stream.Collectors; @EventDriven @SideEffectFree @@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor { public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; public static final String DESTINATION_CONTENT = "flowfile-content"; - private static final String APPLICATION_JSON = "application/json"; + public static final String APPLICATION_JSON = "application/json"; public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() @@ -116,6 +116,10 @@ public class AttributesToJSON extends AbstractProcessor { private List properties; private Set relationships; private static final ObjectMapper objectMapper = new ObjectMapper(); + private volatile Set attributesToRemove; + private volatile Set attributes; + private volatile Boolean nullValueForEmptyString; + private volatile boolean destinationContent; @Override protected void init(final ProcessorInitializationContext context) { @@ -149,55 +153,57 @@ public class AttributesToJSON extends AbstractProcessor { * @return * Map of values that are feed to a Jackson ObjectMapper */ - protected Map buildAttributesMapForFlowFile(FlowFile ff, String atrList, - boolean includeCoreAttributes, - boolean nullValForEmptyString) { - - Map atsToWrite = new HashMap<>(); + protected Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Set attributesToRemove, boolean nullValForEmptyString) { + Map result; + //If list of attributes specified get only those attributes. Otherwise write them all + if (attributes != null) { + result = new HashMap<>(attributes.size()); + for (String attribute : attributes) { + String val = ff.getAttribute(attribute); + if (val != null || nullValForEmptyString) { + result.put(attribute, val); + } else { + result.put(attribute, ""); + } + } + } else { + Map ffAttributes = ff.getAttributes(); + result = new HashMap<>(ffAttributes.size()); + for (Map.Entry e : ffAttributes.entrySet()) { + if (!attributesToRemove.contains(e.getKey())) { + result.put(e.getKey(), e.getValue()); + } + } + } + return result; + } + private Set buildAtrs(String atrList, Set atrsToExclude) { //If list of attributes specified get only those attributes. Otherwise write them all if (StringUtils.isNotBlank(atrList)) { String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR); if (ats != null) { + Set result = new HashSet<>(ats.length); for (String str : ats) { - String cleanStr = str.trim(); - String val = ff.getAttribute(cleanStr); - if (val != null) { - atsToWrite.put(cleanStr, val); - } else { - if (nullValForEmptyString) { - atsToWrite.put(cleanStr, null); - } else { - atsToWrite.put(cleanStr, ""); - } + String trim = str.trim(); + if (!atrsToExclude.contains(trim)) { + result.add(trim); } } + return result; } - } else { - atsToWrite.putAll(ff.getAttributes()); } - - if (!includeCoreAttributes) { - atsToWrite = removeCoreAttributes(atsToWrite); - } - - return atsToWrite; + return null; } - /** - * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile. - * - * @param atsToWrite - * List of Attributes that have already been generated including the CoreAttributes - * - * @return - * Difference of all attributes minus the CoreAttributes - */ - protected Map removeCoreAttributes(Map atsToWrite) { - for (CoreAttributes c : CoreAttributes.values()) { - atsToWrite.remove(c.key()); - } - return atsToWrite; + @OnScheduled + public void onScheduled(ProcessContext context) { + attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values()) + .map(CoreAttributes::key) + .collect(Collectors.toSet()); + attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove); + nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); + destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); } @Override @@ -207,33 +213,21 @@ public class AttributesToJSON extends AbstractProcessor { return; } - final Map atrList = buildAttributesMapForFlowFile(original, - context.getProperty(ATTRIBUTES_LIST).getValue(), - context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(), - context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean()); + final Map atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString); try { - - switch (context.getProperty(DESTINATION).getValue()) { - case DESTINATION_ATTRIBUTE: - FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, - objectMapper.writeValueAsString(atrList)); - session.transfer(atFlowfile, REL_SUCCESS); - break; - case DESTINATION_CONTENT: - FlowFile conFlowfile = session.write(original, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - try (OutputStream outputStream = new BufferedOutputStream(out)) { - outputStream.write(objectMapper.writeValueAsBytes(atrList)); - } - } - }); - conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - session.transfer(conFlowfile, REL_SUCCESS); - break; + if (destinationContent) { + FlowFile conFlowfile = session.write(original, (in, out) -> { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(objectMapper.writeValueAsBytes(atrList)); + } + }); + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + session.transfer(conFlowfile, REL_SUCCESS); + } else { + FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList)); + session.transfer(atFlowfile, REL_SUCCESS); } - } catch (JsonProcessingException e) { getLogger().error(e.getMessage()); session.transfer(original, REL_FAILURE); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index 0f9ec2616c..5c8df9bdc8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -19,13 +19,20 @@ package org.apache.nifi.processors.standard; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -193,7 +200,6 @@ public class TestAttributesToJSON { testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}"); } - @Test public void testAttribute_singleUserDefinedAttribute() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); @@ -279,4 +285,107 @@ public class TestAttributesToJSON { assertTrue(val.size() == 1); } + @Test + public void testAttribute_noIncludeCoreAttributesUserDefined() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " , " + CoreAttributes.PATH.key() + " "); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0) + .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS) + .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(json, HashMap.class); + assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY)); + assertEquals(1, val.size()); + } + + @Test + public void testAttribute_noIncludeCoreAttributesContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE); + ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + + ObjectMapper mapper = new ObjectMapper(); + Map val = mapper.readValue(testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).toByteArray(), HashMap.class); + assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY)); + assertEquals(1, val.size()); + } + + @Test + public void testAttribute_includeCoreAttributesContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertEquals(AttributesToJSON.APPLICATION_JSON, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + Map val = new ObjectMapper().readValue(flowFile.toByteArray(), HashMap.class); + assertEquals(3, val.size()); + Set coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); + val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k))); + } + + @Test + public void testAttribute_includeCoreAttributesAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + Map val = new ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME), HashMap.class); + assertEquals(3, val.size()); + Set coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); + val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k))); + } }