NIFI-4649 Added FlattenJson processor.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2307

Replaced star imports, removed unused import

Added explanation for invalid Expression
This commit is contained in:
Mike Thomsen 2017-11-30 07:23:53 -05:00 committed by Matthew Burgess
parent 89fb1b37d9
commit d9866c75e2
4 changed files with 288 additions and 0 deletions

View File

@ -302,6 +302,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.wnameless</groupId>
<artifactId>json-flattener</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.bval</groupId>
<artifactId>bval-jsr</artifactId>

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.wnameless.json.flattener.FlattenMode;
import com.github.wnameless.json.flattener.JsonFlattener;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({ "json", "flatten" })
@CapabilityDescription(
"Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " +
"document. The keys are combined at each level with a user-defined separator that defaults to '.'"
)
@SideEffectFree
public class FlattenJson extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully flattened files go to this relationship.")
.name("success")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.description("Files that cannot be flattened go to this relationship.")
.name("failure")
.build();
static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
.name("flatten-json-separator")
.displayName("Separator")
.defaultValue(".")
.description("The separator character used for joining keys. Must be a JSON-legal character.")
.addValidator((subject, input, context) -> {
if (context.isExpressionLanguagePresent(input)) {
ExpressionLanguageCompiler elc = context.newExpressionLanguageCompiler();
final boolean validExpression = elc.isValidExpression(input);
return new ValidationResult.Builder().subject(subject).input(input)
.valid(validExpression).explanation(validExpression ? "": "Not a valid Expression").build();
}
boolean valid = input != null && input.length() == 1;
String message = !valid ? "The separator must be a single character in length." : "";
ObjectMapper mapper = new ObjectMapper();
String test = String.format("{ \"prop%sprop\": \"test\" }", input);
try {
mapper.readValue(test, Map.class);
} catch (IOException e) {
message = e.getLocalizedMessage();
valid = false;
}
return new ValidationResult.Builder().subject(subject).input(input).valid(valid).explanation(message).build();
})
.expressionLanguageSupported(true)
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEPARATOR);
properties = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
session.exportTo(flowFile, bos);
bos.close();
String raw = new String(bos.toByteArray());
final String flattened = new JsonFlattener(raw)
.withFlattenMode(FlattenMode.KEEP_ARRAYS)
.withSeparator(separator.charAt(0))
.flatten();
flowFile = session.write(flowFile, os -> os.write(flattened.getBytes()));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception ex) {
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -33,6 +33,7 @@ org.apache.nifi.processors.standard.ExecuteProcess
org.apache.nifi.processors.standard.ExtractText
org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.FetchFile
org.apache.nifi.processors.standard.FlattenJson
org.apache.nifi.processors.standard.GenerateFlowFile
org.apache.nifi.processors.standard.GetFile
org.apache.nifi.processors.standard.GetFTP

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import groovy.json.JsonSlurper
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class TestFlattenJson {
@Test
void testFlatten() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
test: [
msg: "Hello, world"
],
first: [
second: [
third: [
"one", "two", "three", "four", "five"
]
]
]
]))
baseTest(testRunner, json, 2) { parsed ->
Assert.assertEquals("test.msg should exist, but doesn't", parsed["test.msg"], "Hello, world")
Assert.assertEquals("Three level block doesn't exist.", parsed["first.second.third"], [
"one", "two", "three", "four", "five"
])
}
}
void baseTest(testRunner, String json, int keyCount, Closure c) {
baseTest(testRunner, json, [:], keyCount, c);
}
void baseTest(def testRunner, String json, Map attrs, int keyCount, Closure c) {
testRunner.enqueue(json, attrs)
testRunner.run(1, true)
testRunner.assertTransferCount(FlattenJson.REL_FAILURE, 0)
testRunner.assertTransferCount(FlattenJson.REL_SUCCESS, 1)
def flowFiles = testRunner.getFlowFilesForRelationship(FlattenJson.REL_SUCCESS)
def content = testRunner.getContentAsByteArray(flowFiles[0])
def asJson = new String(content)
def slurper = new JsonSlurper()
def parsed = slurper.parseText(asJson) as Map
Assert.assertEquals("Too many keys", keyCount, parsed.size())
c.call(parsed)
}
@Test
void testFlattenRecordSet() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
[
first: [
second: "Hello"
]
],
[
first: [
second: "World"
]
]
]))
def expected = ["Hello", "World"]
baseTest(testRunner, json, 2) { parsed ->
Assert.assertTrue("Not a list", parsed instanceof List)
0.upto(parsed.size() - 1) {
Assert.assertEquals("Missing values.", parsed[it]["first.second"], expected[it])
}
}
}
@Test
void testDifferentSeparator() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
third: [
"one", "two", "three", "four", "five"
]
]
]
]))
testRunner.setProperty(FlattenJson.SEPARATOR, "_")
baseTest(testRunner, json, 1) { parsed ->
Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [
"one", "two", "three", "four", "five"
])
}
}
@Test
void testExpressionLanguage() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
first: [
second: [
third: [
"one", "two", "three", "four", "five"
]
]
]
]))
testRunner.setValidateExpressionUsage(true);
testRunner.setProperty(FlattenJson.SEPARATOR, '${separator.char}')
baseTest(testRunner, json, ["separator.char": "_"], 1) { parsed ->
Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [
"one", "two", "three", "four", "five"
])
}
}
}