diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index a96631ab75..2557d1a442 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -302,6 +302,11 @@ + + com.github.wnameless + json-flattener + 0.4.1 + org.apache.bval bval-jsr diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java new file mode 100644 index 0000000000..040615739a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.wnameless.json.flattener.FlattenMode; +import com.github.wnameless.json.flattener.JsonFlattener; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageCompiler; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({ "json", "flatten" }) +@CapabilityDescription( + "Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " + + "document. The keys are combined at each level with a user-defined separator that defaults to '.'" +) +@SideEffectFree +public class FlattenJson extends AbstractProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("Successfully flattened files go to this relationship.") + .name("success") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .description("Files that cannot be flattened go to this relationship.") + .name("failure") + .build(); + + static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder() + .name("flatten-json-separator") + .displayName("Separator") + .defaultValue(".") + .description("The separator character used for joining keys. Must be a JSON-legal character.") + .addValidator((subject, input, context) -> { + if (context.isExpressionLanguagePresent(input)) { + ExpressionLanguageCompiler elc = context.newExpressionLanguageCompiler(); + final boolean validExpression = elc.isValidExpression(input); + return new ValidationResult.Builder().subject(subject).input(input) + .valid(validExpression).explanation(validExpression ? "": "Not a valid Expression").build(); + } + + boolean valid = input != null && input.length() == 1; + String message = !valid ? "The separator must be a single character in length." : ""; + + ObjectMapper mapper = new ObjectMapper(); + String test = String.format("{ \"prop%sprop\": \"test\" }", input); + try { + mapper.readValue(test, Map.class); + } catch (IOException e) { + message = e.getLocalizedMessage(); + valid = false; + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(valid).explanation(message).build(); + }) + .expressionLanguageSupported(true) + .build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + List props = new ArrayList<>(); + props.add(SEPARATOR); + properties = Collections.unmodifiableList(props); + + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + session.exportTo(flowFile, bos); + bos.close(); + + String raw = new String(bos.toByteArray()); + final String flattened = new JsonFlattener(raw) + .withFlattenMode(FlattenMode.KEEP_ARRAYS) + .withSeparator(separator.charAt(0)) + .flatten(); + + flowFile = session.write(flowFile, os -> os.write(flattened.getBytes())); + + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception ex) { + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index c95f964f25..3fb0de3405 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -33,6 +33,7 @@ org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.FetchSFTP org.apache.nifi.processors.standard.FetchFile +org.apache.nifi.processors.standard.FlattenJson org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy new file mode 100644 index 0000000000..b3192582ff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard + +import groovy.json.JsonSlurper +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Test +import static groovy.json.JsonOutput.prettyPrint +import static groovy.json.JsonOutput.toJson + +class TestFlattenJson { + @Test + void testFlatten() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + test: [ + msg: "Hello, world" + ], + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + baseTest(testRunner, json, 2) { parsed -> + Assert.assertEquals("test.msg should exist, but doesn't", parsed["test.msg"], "Hello, world") + Assert.assertEquals("Three level block doesn't exist.", parsed["first.second.third"], [ + "one", "two", "three", "four", "five" + ]) + } + } + + void baseTest(testRunner, String json, int keyCount, Closure c) { + baseTest(testRunner, json, [:], keyCount, c); + } + + void baseTest(def testRunner, String json, Map attrs, int keyCount, Closure c) { + testRunner.enqueue(json, attrs) + testRunner.run(1, true) + testRunner.assertTransferCount(FlattenJson.REL_FAILURE, 0) + testRunner.assertTransferCount(FlattenJson.REL_SUCCESS, 1) + + def flowFiles = testRunner.getFlowFilesForRelationship(FlattenJson.REL_SUCCESS) + def content = testRunner.getContentAsByteArray(flowFiles[0]) + def asJson = new String(content) + def slurper = new JsonSlurper() + def parsed = slurper.parseText(asJson) as Map + + Assert.assertEquals("Too many keys", keyCount, parsed.size()) + c.call(parsed) + } + + @Test + void testFlattenRecordSet() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + [ + first: [ + second: "Hello" + ] + ], + [ + first: [ + second: "World" + ] + ] + ])) + + def expected = ["Hello", "World"] + baseTest(testRunner, json, 2) { parsed -> + Assert.assertTrue("Not a list", parsed instanceof List) + 0.upto(parsed.size() - 1) { + Assert.assertEquals("Missing values.", parsed[it]["first.second"], expected[it]) + } + } + } + + @Test + void testDifferentSeparator() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + testRunner.setProperty(FlattenJson.SEPARATOR, "_") + baseTest(testRunner, json, 1) { parsed -> + Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [ + "one", "two", "three", "four", "five" + ]) + } + } + + @Test + void testExpressionLanguage() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + + testRunner.setValidateExpressionUsage(true); + testRunner.setProperty(FlattenJson.SEPARATOR, '${separator.char}') + baseTest(testRunner, json, ["separator.char": "_"], 1) { parsed -> + Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [ + "one", "two", "three", "four", "five" + ]) + } + } +}