NIFI-4800 Expose the flattenMode as property in FlattenJSON processor

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2530.
This commit is contained in:
Deon Huang 2018-03-11 23:01:53 +08:00 committed by Pierre Villard
parent d78d95ad6f
commit 7aabb99bcc
2 changed files with 78 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import com.github.wnameless.json.flattener.JsonFlattener;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
@ -43,10 +44,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({ "json", "flatten" })
@Tags({"json", "flatten"})
@CapabilityDescription(
"Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " +
"document. The keys are combined at each level with a user-defined separator that defaults to '.'"
"document. The keys are combined at each level with a user-defined separator that defaults to '.'. " +
"Support three kinds of flatten mode, normal, keep-arrays and dot notation for MongoDB query. " +
"Default flatten mode is 'keep-arrays'."
)
@SideEffectFree
public class FlattenJson extends AbstractProcessor {
@ -89,6 +92,25 @@ public class FlattenJson extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
"Flattens every objects into a single level json");
public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
"Flattens every objects and keep arrays format");
public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
"Conforms to MongoDB dot notation to update also nested documents");
public static final PropertyDescriptor FLATTEN_MODE = new PropertyDescriptor.Builder()
.name("flatten-mode")
.displayName("Flatten Mode")
.description("Specifies how json is flattened")
.defaultValue(FLATTEN_MODE_KEEP_ARRAYS.getValue())
.required(true)
.allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION)
.expressionLanguageSupported(false)
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@ -96,6 +118,7 @@ public class FlattenJson extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEPARATOR);
props.add(FLATTEN_MODE);
properties = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>();
@ -122,8 +145,12 @@ public class FlattenJson extends AbstractProcessor {
return;
}
final String mode = context.getProperty(FLATTEN_MODE).getValue();
final FlattenMode flattenMode = getFlattenMode(mode);
String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
session.exportTo(flowFile, bos);
@ -131,7 +158,7 @@ public class FlattenJson extends AbstractProcessor {
String raw = new String(bos.toByteArray());
final String flattened = new JsonFlattener(raw)
.withFlattenMode(FlattenMode.KEEP_ARRAYS)
.withFlattenMode(flattenMode)
.withSeparator(separator.charAt(0))
.flatten();
@ -142,4 +169,14 @@ public class FlattenJson extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
private FlattenMode getFlattenMode(String mode) {
if (FLATTEN_MODE_NORMAL.getValue().equals(mode)) {
return FlattenMode.NORMAL;
} else if (FLATTEN_MODE_DOT_NOTATION.getValue().equals(mode)) {
return FlattenMode.MONGODB;
} else {
return FlattenMode.KEEP_ARRAYS;
}
}
}

View File

@ -134,4 +134,42 @@ class TestFlattenJson {
])
}
}
@Test
void testFlattenModeNormal() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
third: [
"one", "two", "three", "four", "five"
]
]
]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_NORMAL)
baseTest(testRunner, json,5) { parsed ->
Assert.assertEquals("Separator not applied.", "one", parsed["first.second.third[0]"])
}
}
@Test
void testFlattenModeDotNotation() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
third: [
"one", "two", "three", "four", "five"
]
]
]
]))
testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_DOT_NOTATION)
baseTest(testRunner, json,5) { parsed ->
Assert.assertEquals("Separator not applied.", "one", parsed["first.second.third.0"])
}
}
}