NIFI-1079 Added Destination Property to control if JSON goes to Attribute or

Content of FlowFile. Added Include Core Attributes Property to control
if FlowFile CoreAttributes are included in the JSON output or not.
Added Null value for Empty String flag to control if empty values in
the JSON are empty string or true NULL values. Added more tests and
minor text refactoring per Github comments

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Jeremy Dyer 2015-10-28 20:53:46 -04:00 committed by Bryan Bende
parent 19b7a4cc7d
commit 217b1049cf
2 changed files with 230 additions and 49 deletions

View File

@ -8,16 +8,22 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"JSON", "attributes"})
@Tags({"json", "attributes"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
"The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
@ -28,16 +34,48 @@ public class AttributesToJSON extends AbstractProcessor {
public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
private static final String AT_LIST_SEPARATOR = ",";
private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
.name("Attributes List")
.description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
.description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " +
"attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
"in the flowfile an empty string will be output for that attritbute in the resulting JSON")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
"or written in the flowfile content. " +
"Writing to flowfile content will overwrite any existing flowfile content.")
.required(true)
.allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
.defaultValue(DESTINATION_ATTRIBUTE)
.build();
public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Include Core Attributes")
.description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " +
"contained in every FlowFile should be included in the final JSON value generated.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
.name("Null Value")
.description("If an Attribute is value is empty or not present this property determines if an empty string" +
"or true NULL value is present in the resulting JSON output")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
@ -52,6 +90,9 @@ public class AttributesToJSON extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@ -70,6 +111,64 @@ public class AttributesToJSON extends AbstractProcessor {
return relationships;
}
/**
* Builds the Map of attributes that should be included in the JSON that is emitted from this process.
*
* @return
* Map<String, String> of values that are feed to a Jackson ObjectMapper
*/
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
boolean includeCoreAttributes,
boolean nullValForEmptyString) {
Map<String, String> atsToWrite = new HashMap<>();
//If list of attributes specified get only those attributes. Otherwise write them all
if (StringUtils.isNotBlank(atrList)) {
String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
if (ats != null) {
for (String str : ats) {
String cleanStr = str.trim();
String val = ff.getAttribute(cleanStr);
if (val != null) {
atsToWrite.put(cleanStr, val);
} else {
if (nullValForEmptyString) {
atsToWrite.put(cleanStr, null);
} else {
atsToWrite.put(cleanStr, "");
}
}
}
}
} else {
atsToWrite.putAll(ff.getAttributes());
}
if (!includeCoreAttributes) {
atsToWrite = removeCoreAttributes(atsToWrite);
}
return atsToWrite;
}
/**
* Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile.
*
* @param atsToWrite
* List of Attributes that have already been generated including the CoreAttributes
*
* @return
* Difference of all attributes minus the CoreAttributes
*/
protected Map<String, String> removeCoreAttributes(Map<String, String> atsToWrite) {
for (CoreAttributes c : CoreAttributes.values()) {
atsToWrite.remove(c.key());
}
return atsToWrite;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
@ -77,41 +176,35 @@ public class AttributesToJSON extends AbstractProcessor {
return;
}
final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
Map<String, String> atsToWrite = null;
final Map<String, String> atrList = buildAttributesMapForFlowFile(original,
context.getProperty(ATTRIBUTES_LIST).getValue(),
context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
//If list of attributes specified get only those attributes. Otherwise write them all
if (atList != null && !StringUtils.isEmpty(atList)) {
atsToWrite = new HashMap<>();
String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
if (ats != null) {
for (String str : ats) {
String cleanStr = str.trim();
String val = original.getAttribute(cleanStr);
if (val != null) {
atsToWrite.put(cleanStr, val);
} else {
atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
}
}
try {
switch (context.getProperty(DESTINATION).getValue()) {
case DESTINATION_ATTRIBUTE:
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
objectMapper.writeValueAsString(atrList));
session.transfer(atFlowfile, REL_SUCCESS);
break;
case DESTINATION_CONTENT:
FlowFile conFlowfile = session.write(original, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(atrList));
}
}
});
session.transfer(conFlowfile, REL_SUCCESS);
break;
}
} else {
atsToWrite = original.getAttributes();
}
if (atsToWrite != null) {
if (atsToWrite.size() == 0) {
getLogger().debug("Flowfile contains no attributes to convert to JSON");
} else {
try {
FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
session.transfer(updated, REL_SUCCESS);
} catch (JsonProcessingException e) {
getLogger().error(e.getMessage());
session.transfer(original, REL_FAILURE);
}
}
} catch (JsonProcessingException e) {
getLogger().error(e.getMessage());
session.transfer(original, REL_FAILURE);
}
}
}

View File

@ -6,28 +6,17 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestAttributesToJSON {
private static Logger LOGGER;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug");
LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
}
private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
@ -45,9 +34,84 @@ public class TestAttributesToJSON {
testRunner.run();
}
@Test(expected = AssertionError.class)
public void testInvalidIncludeCoreAttributesProperty() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "val1,val2");
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "maybe");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
testRunner.enqueue(ff);
testRunner.run();
}
@Test
public void testNullValueForEmptyAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "true");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
testRunner.enqueue(ff);
testRunner.run();
//Expecting success transition because Jackson is taking care of escaping the bad JSON characters
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
//Make sure that the value is a true JSON null for the non existing attribute
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertNull(val.get(NON_PRESENT_ATTRIBUTE_KEY));
}
@Test
public void testEmptyStringValueForEmptyAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "false");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
testRunner.enqueue(ff);
testRunner.run();
//Expecting success transition because Jackson is taking care of escaping the bad JSON characters
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
//Make sure that the value is a true JSON null for the non existing attribute
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertEquals(val.get(NON_PRESENT_ATTRIBUTE_KEY), "");
}
@Test
public void testInvalidJSONValueInAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
@ -67,8 +131,9 @@ public class TestAttributesToJSON {
@Test
public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception {
public void testAttributes_emptyListUserSpecifiedAttributes() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
@ -92,10 +157,31 @@ public class TestAttributesToJSON {
}
@Test
public void testContent_emptyListUserSpecifiedAttributes() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
}
@Test
public void testAttribute_singleUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
@ -123,6 +209,7 @@ public class TestAttributesToJSON {
public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " ");
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
@ -150,6 +237,7 @@ public class TestAttributesToJSON {
public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute");
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();