NIFI-12408 Added Pretty Print Property to AttributesToJSON

This closes #8156

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2023-12-13 18:27:17 +00:00 committed by exceptionfactory
parent ee2368e0ae
commit 1ddb5c185a
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 20 additions and 5 deletions

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -159,18 +160,29 @@ public class AttributesToJSON extends AbstractProcessor {
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue())
.build();
public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
.name("Pretty Print")
.displayName("Pretty Print")
.description("Apply pretty print formatting to the output.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(DESTINATION, DESTINATION_CONTENT)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ObjectMapper objectMapper = new ObjectMapper();
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
private volatile boolean destinationContent;
private volatile ObjectWriter objectWriter;
private volatile Pattern pattern;
private volatile JsonHandlingStrategy jsonHandlingStrategy;
@ -183,6 +195,7 @@ public class AttributesToJSON extends AbstractProcessor {
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(JSON_HANDLING_STRATEGY);
properties.add(PRETTY_PRINT);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@ -265,6 +278,8 @@ public class AttributesToJSON extends AbstractProcessor {
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue());
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean();
objectWriter = destinationContent && prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter() : OBJECT_MAPPER.writer();
jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue());
if(context.getProperty(ATTRIBUTES_REGEX).isSet()) {
@ -282,17 +297,17 @@ public class AttributesToJSON extends AbstractProcessor {
final Map<String, Object> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern);
try {
Map<String, Object> formattedAttributes = getFormattedAttributes(atrList);
final Map<String, Object> formattedAttributes = getFormattedAttributes(atrList);
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(formattedAttributes));
objectWriter.writeValue(outputStream, formattedAttributes);
}
});
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.transfer(conFlowfile, REL_SUCCESS);
} else {
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(formattedAttributes));
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, OBJECT_MAPPER.writeValueAsString(formattedAttributes));
session.transfer(atFlowfile, REL_SUCCESS);
}
} catch (JsonProcessingException e) {
@ -310,7 +325,7 @@ public class AttributesToJSON extends AbstractProcessor {
for (Map.Entry<String, Object> entry : flowFileAttributes.entrySet()) {
String value = (String) entry.getValue();
if (StringUtils.isNotBlank(value) && (isPossibleJsonArray(value) || isPossibleJsonObject(value))) {
formattedAttributes.put(entry.getKey(), objectMapper.readTree(value));
formattedAttributes.put(entry.getKey(), OBJECT_MAPPER.readTree(value));
} else {
formattedAttributes.put(entry.getKey(), value);
}