NIFI-8613: Improve FlattenJson Processor

- Unflattening a flattened json
- Preserving primitive arrays such as strings, numbers, booleans and null in a nested json
- Logging errors when failure
- Pretty printing resulted json

This closes #5083

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mohammed Nadeem 2021-05-18 20:16:04 +05:30 committed by exceptionfactory
parent a3eaf0a37a
commit c113960b81
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 264 additions and 40 deletions

View File

@ -331,7 +331,7 @@
<artifactId>ParCEFone</artifactId> <artifactId>ParCEFone</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.github.wnameless</groupId> <groupId>com.github.wnameless.json</groupId>
<artifactId>json-flattener</artifactId> <artifactId>json-flattener</artifactId>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -20,6 +20,9 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.wnameless.json.flattener.FlattenMode; import com.github.wnameless.json.flattener.FlattenMode;
import com.github.wnameless.json.flattener.JsonFlattener; import com.github.wnameless.json.flattener.JsonFlattener;
import com.github.wnameless.json.flattener.PrintMode;
import com.github.wnameless.json.unflattener.JsonUnflattener;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringEscapeUtils; import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -40,9 +43,10 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -54,24 +58,32 @@ import java.util.Set;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@SideEffectFree @SideEffectFree
@Tags({"json", "flatten"}) @Tags({"json", "flatten", "unflatten"})
@CapabilityDescription( @CapabilityDescription(
"Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " + "Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " +
"document. The keys are combined at each level with a user-defined separator that defaults to '.'. " + "document. The keys are combined at each level with a user-defined separator that defaults to '.'. " +
"Support three kinds of flatten mode, normal, keep-arrays and dot notation for MongoDB query. " + "This Processor also allows to unflatten back the flattened json. It supports four kinds of flatten mode " +
"Default flatten mode is 'keep-arrays'." "such as normal, keep-arrays, dot notation for MongoDB query and keep-primitive-arrays. Default flatten mode " +
"is 'keep-arrays'."
) )
public class FlattenJson extends AbstractProcessor { public class FlattenJson extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully flattened files go to this relationship.")
.name("success")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.description("Files that cannot be flattened go to this relationship.")
.name("failure")
.build();
static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder() public static final String RETURN_TYPE_FLATTEN = "flatten";
public static final String RETURN_TYPE_UNFLATTEN = "unflatten";
public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
"Flattens every objects into a single level json");
public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
"Flattens every objects and keep arrays format");
public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
"Conforms to MongoDB dot notation to update also nested documents");
public static final AllowableValue FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS = new AllowableValue("keep primitive arrays", "keep primitive arrays",
"Flattens every objects except arrays which contain only primitive types (strings, numbers, booleans and null)");
public static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
.name("flatten-json-separator") .name("flatten-json-separator")
.displayName("Separator") .displayName("Separator")
.defaultValue(".") .defaultValue(".")
@ -101,25 +113,56 @@ public class FlattenJson extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build(); .build();
public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
"Flattens every objects into a single level json");
public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
"Flattens every objects and keep arrays format");
public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
"Conforms to MongoDB dot notation to update also nested documents");
public static final PropertyDescriptor FLATTEN_MODE = new PropertyDescriptor.Builder() public static final PropertyDescriptor FLATTEN_MODE = new PropertyDescriptor.Builder()
.name("flatten-mode") .name("flatten-mode")
.displayName("Flatten Mode") .displayName("Flatten Mode")
.description("Specifies how json is flattened") .description("Specifies how json should be flattened/unflattened")
.defaultValue(FLATTEN_MODE_KEEP_ARRAYS.getValue()) .defaultValue(FLATTEN_MODE_KEEP_ARRAYS.getValue())
.required(true) .required(true)
.allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION) .allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION, FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
.expressionLanguageSupported(ExpressionLanguageScope.NONE) .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build(); .build();
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
.name("flatten-json-return-type")
.displayName("Return Type")
.description("Specifies the desired return type of json such as flatten/unflatten")
.defaultValue(RETURN_TYPE_FLATTEN)
.required(true)
.allowableValues(RETURN_TYPE_FLATTEN, RETURN_TYPE_UNFLATTEN)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("flatten-json-character-set")
.displayName("Character Set")
.description("The Character Set in which file is encoded")
.defaultValue("UTF-8")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
.name("flatten-json-pretty-print-json")
.displayName("Pretty Print JSON")
.description("Specifies whether or not resulted json should be pretty printed")
.defaultValue("false")
.required(true)
.allowableValues("true", "false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully flattened/unflattened files go to this relationship.")
.name("success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("Files that cannot be flattened/unflattened go to this relationship.")
.name("failure")
.build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private Set<Relationship> relationships; private Set<Relationship> relationships;
@ -128,6 +171,9 @@ public class FlattenJson extends AbstractProcessor {
List<PropertyDescriptor> props = new ArrayList<>(); List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEPARATOR); props.add(SEPARATOR);
props.add(FLATTEN_MODE); props.add(FLATTEN_MODE);
props.add(RETURN_TYPE);
props.add(CHARACTER_SET);
props.add(PRETTY_PRINT);
properties = Collections.unmodifiableList(props); properties = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>(); Set<Relationship> rels = new HashSet<>();
@ -157,25 +203,36 @@ public class FlattenJson extends AbstractProcessor {
final String mode = context.getProperty(FLATTEN_MODE).getValue(); final String mode = context.getProperty(FLATTEN_MODE).getValue();
final FlattenMode flattenMode = getFlattenMode(mode); final FlattenMode flattenMode = getFlattenMode(mode);
String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); final Character separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue().charAt(0);
final String returnType = context.getProperty(RETURN_TYPE).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final PrintMode printMode = context.getProperty(PRETTY_PRINT).asBoolean() ? PrintMode.PRETTY : PrintMode.MINIMAL;
try { try {
ByteArrayOutputStream bos = new ByteArrayOutputStream(); final StringBuilder contents = new StringBuilder();
session.exportTo(flowFile, bos); session.read(flowFile, in -> contents.append(IOUtils.toString(in, charset)));
bos.close();
String raw = new String(bos.toByteArray()); final String resultedJson;
final String flattened = new JsonFlattener(raw) if (returnType.equals(RETURN_TYPE_FLATTEN)) {
.withFlattenMode(flattenMode) resultedJson = new JsonFlattener(contents.toString())
.withSeparator(separator.charAt(0)) .withFlattenMode(flattenMode)
.withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON) .withSeparator(separator)
.flatten(); .withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON)
.withPrintMode(printMode)
.flatten();
} else {
resultedJson = new JsonUnflattener(contents.toString())
.withFlattenMode(flattenMode)
.withSeparator(separator)
.withPrintMode(printMode)
.unflatten();
}
flowFile = session.write(flowFile, os -> os.write(flattened.getBytes())); flowFile = session.write(flowFile, out -> out.write(resultedJson.getBytes(charset)));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} catch (Exception ex) { } catch (Exception e) {
getLogger().error("Failed to {} JSON", returnType, e);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} }
} }
@ -185,6 +242,8 @@ public class FlattenJson extends AbstractProcessor {
return FlattenMode.NORMAL; return FlattenMode.NORMAL;
} else if (FLATTEN_MODE_DOT_NOTATION.getValue().equals(mode)) { } else if (FLATTEN_MODE_DOT_NOTATION.getValue().equals(mode)) {
return FlattenMode.MONGODB; return FlattenMode.MONGODB;
} else if (FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS.getValue().equals(mode)) {
return FlattenMode.KEEP_PRIMITIVE_ARRAYS;
} else { } else {
return FlattenMode.KEEP_ARRAYS; return FlattenMode.KEEP_ARRAYS;
} }

View File

@ -154,6 +154,77 @@ class TestFlattenJson {
} }
} }
@Test
void testFlattenModeKeepArrays() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
[
x: 1,
y: 2,
z: [3, 4, 5]
],
[ 6, 7, 8],
[
[9, 10],
11,
12
]
],
"third" : [
a: "b",
c: "d",
e: "f"
]
]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
baseTest(testRunner, json,4) { parsed ->
assert parsed["first.second"] instanceof List // [{x=1, y=2, z=[3, 4, 5]}, [6, 7, 8], [[9, 10], 11, 12]]
assert parsed["first.second"][1] == [6, 7, 8]
Assert.assertEquals("Separator not applied.", "b", parsed["first.third.a"])
}
}
@Test
void testFlattenModeKeepPrimitiveArrays() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
[
x: 1,
y: 2,
z: [3, 4, 5]
],
[ 6, 7, 8],
[
[9, 10],
11,
12
]
],
"third" : [
a: "b",
c: "d",
e: "f"
]
]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
baseTest(testRunner, json,10) { parsed ->
Assert.assertEquals("Separator not applied.", 1, parsed["first.second[0].x"])
Assert.assertEquals("Separator not applied.", [3, 4, 5], parsed["first.second[0].z"])
Assert.assertEquals("Separator not applied.", [9, 10], parsed["first.second[2][0]"])
Assert.assertEquals("Separator not applied.", 11, parsed["first.second[2][1]"])
Assert.assertEquals("Separator not applied.", 12, parsed["first.second[2][2]"])
Assert.assertEquals("Separator not applied.", "d", parsed["first.third.c"])
}
}
@Test @Test
void testFlattenModeDotNotation() { void testFlattenModeDotNotation() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class) def testRunner = TestRunners.newTestRunner(FlattenJson.class)
@ -203,4 +274,98 @@ class TestFlattenJson {
Assert.assertEquals("Separator not applied.", "José", parsed["name"]) Assert.assertEquals("Separator not applied.", "José", parsed["name"])
} }
} }
@Test
void testUnFlatten() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
"test.msg": "Hello, world",
"first.second.third": [ "one", "two", "three", "four", "five" ]
]))
testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
baseTest(testRunner, json, 2) { parsed ->
assert parsed.test instanceof Map
assert parsed.test.msg == "Hello, world"
assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
}
}
@Test
void testUnFlattenWithDifferentSeparator() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
"first_second_third": [ "one", "two", "three", "four", "five" ]
]))
testRunner.setProperty(FlattenJson.SEPARATOR, "_")
testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
baseTest(testRunner, json, 1) { parsed ->
assert parsed.first instanceof Map
assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
}
}
@Test
void testUnFlattenForKeepArraysMode() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
"a.b": 1,
"a.c": [
false,
["i.j": [ false, true, "xy" ] ]
]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
baseTest(testRunner, json, 1) { parsed ->
assert parsed.a instanceof Map
assert parsed.a.b == 1
assert parsed.a.c[0] == false
assert parsed.a.c[1].i instanceof Map
assert parsed.a.c[1].i.j == [false, true, "xy"]
}
}
@Test
void testUnFlattenForKeepPrimitiveArraysMode() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
"first.second[0].x": 1,
"first.second[0].y": 2,
"first.second[0].z": [3, 4, 5],
"first.second[1]": [6, 7, 8],
"first.second[2][0]": [9, 10],
"first.second[2][1]": 11,
"first.second[2][2]": 12,
"first.third.a": "b",
"first.third.c": "d",
"first.third.e": "f"
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
baseTest(testRunner, json, 1) { parsed ->
assert parsed.first instanceof Map
assert parsed.first.second[0].x == 1
assert parsed.first.second[2][0] == [9, 10]
assert parsed.first.third.c == "d"
}
}
@Test
void testUnFlattenForDotNotationMode() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
"first.second.third.0": ["one", "two", "three", "four", "five"]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_DOT_NOTATION)
testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
baseTest(testRunner, json,1) { parsed ->
assert parsed.first instanceof Map
assert parsed.first.second.third[0] == ["one", "two", "three", "four", "five"]
}
}
} }

View File

@ -305,9 +305,9 @@
<version>1.2.8</version> <version>1.2.8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.github.wnameless</groupId> <groupId>com.github.wnameless.json</groupId>
<artifactId>json-flattener</artifactId> <artifactId>json-flattener</artifactId>
<version>0.7.1</version> <version>0.12.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.bval</groupId> <groupId>org.apache.bval</groupId>