diff --git a/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java index 26fec8e883..d1f3a9777b 100644 --- a/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java +++ b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.processors.jslt; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; @@ -35,6 +39,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.DescribedValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -68,6 +73,9 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.EACH_OBJECT; +import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.ENTIRE_FLOWFILE; + @SideEffectFree @SupportsBatching @Tags({"json", "jslt", "transform"}) @@ -90,6 +98,15 @@ public class JSLTTransformJSON extends AbstractProcessor { .required(true) .build(); + public static final PropertyDescriptor TRANSFORMATION_STRATEGY = new PropertyDescriptor.Builder() + .name("jslt-transform-transformation-strategy") + .displayName("Transformation Strategy") + .description("Whether to apply the JSLT transformation to the entire FlowFile contents or each JSON object in the root-level array") + .required(true) + .allowableValues(TransformationStrategy.class) + .defaultValue(EACH_OBJECT.getValue()) + .build(); + public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder() .name("jslt-transform-pretty_print") .displayName("Pretty Print") @@ -127,6 +144,7 @@ public class JSLTTransformJSON extends AbstractProcessor { descriptors = Collections.unmodifiableList( Arrays.asList( JSLT_TRANSFORM, + TRANSFORMATION_STRATEGY, PRETTY_PRINT, TRANSFORM_CACHE_SIZE ) @@ -174,7 +192,6 @@ public class JSLTTransformJSON extends AbstractProcessor { results.add(transformBuilder.build()); return results; - } @OnScheduled @@ -202,40 +219,73 @@ public class JSLTTransformJSON extends AbstractProcessor { return; } + final TransformationStrategy transformationStrategy = TransformationStrategy.valueOf(context.getProperty(TRANSFORMATION_STRATEGY).getValue()); final StopWatch stopWatch = new StopWatch(true); - final JsonNode jsonNode; - try (final InputStream in = session.read(original)) { - jsonNode = readJson(in); - } catch (final Exception e) { - getLogger().error("JSLT Transform failed {}", original, e); - session.transfer(original, REL_FAILURE); - return; - } - final PropertyValue transformProperty = context.getProperty(JSLT_TRANSFORM); + FlowFile transformed; + final JsonFactory jsonFactory = new JsonFactory(); try { final String transform = readTransform(transformProperty, original); final Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform)); + final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean(); - final JsonNode transformedJson = jsltExpression.apply(jsonNode); - final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer(); - final Object outputObject; - if (transformedJson == null || transformedJson.isNull()) { - getLogger().warn("JSLT Transform resulted in no data {}", original); - outputObject = null; - } else { - outputObject = transformedJson; - } - FlowFile transformed = session.write(original, out -> { - if (outputObject != null) { - writer.writeValue(out, outputObject); + transformed = session.write(original, (inputStream, outputStream) -> { + boolean topLevelArray = false; + JsonParser jsonParser; + JsonNode firstJsonNode; + if (EACH_OBJECT.equals(transformationStrategy)) { + jsonParser = jsonFactory.createParser(inputStream); + jsonParser.setCodec(jsonObjectMapper); + + JsonToken token = jsonParser.nextToken(); + if (token == JsonToken.START_ARRAY) { + token = jsonParser.nextToken(); // advance to START_OBJECT token + topLevelArray = true; + } + if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also + firstJsonNode = jsonParser.readValueAsTree(); + } else { + firstJsonNode = null; + } + } else { + firstJsonNode = readJson(inputStream); + jsonParser = null; // This will not be used when applying the transform to the entire FlowFile } + + final ObjectWriter writer = prettyPrint ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer(); + final JsonGenerator jsonGenerator = writer.createGenerator(outputStream); + + Object outputObject; + JsonNode nextNode; + + if (topLevelArray) { + jsonGenerator.writeStartArray(); + } + nextNode = firstJsonNode; + do { + final JsonNode transformedJson = jsltExpression.apply(nextNode); + if (transformedJson == null || transformedJson.isNull()) { + getLogger().warn("JSLT Transform resulted in no data {}", original); + outputObject = null; + } else { + outputObject = transformedJson; + } + if (outputObject != null) { + jsonGenerator.writeObject(outputObject); + } + } while ((nextNode = getNextJsonNode(transformationStrategy, jsonParser)) != null); + if (topLevelArray) { + jsonGenerator.writeEndArray(); + } + jsonGenerator.flush(); }); + transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(transformed, REL_SUCCESS); session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + stopWatch.stop(); getLogger().debug("JSLT Transform completed {}", original); } catch (final Exception e) { getLogger().error("JSLT Transform failed {}", original, e); @@ -245,7 +295,7 @@ public class JSLTTransformJSON extends AbstractProcessor { @OnStopped @OnShutdown - public void onStopped(ProcessContext context) { + public void onStopped() { transformCache.cleanUp(); } @@ -277,4 +327,62 @@ public class JSLTTransformJSON extends AbstractProcessor { throw new UncheckedIOException("Read JSLT Transform failed", e); } } -} + + + protected JsonNode getNextJsonNode(final TransformationStrategy transformationStrategy, final JsonParser jsonParser) throws IOException { + + if (ENTIRE_FLOWFILE.equals(transformationStrategy)) { + return null; + } + return getJsonNode(jsonParser); + } + + private JsonNode getJsonNode(JsonParser jsonParser) throws IOException { + while (true) { + final JsonToken token = jsonParser.nextToken(); + if (token == null) { + return null; + } + + switch (token) { + case START_ARRAY: + case END_ARRAY: + case END_OBJECT: + break; + case START_OBJECT: + return jsonParser.readValueAsTree(); + default: + throw new IOException("Expected to get a JSON Object but got a token of type " + token.name()); + } + } + } + + enum TransformationStrategy implements DescribedValue { + ENTIRE_FLOWFILE("Entire FlowFile", "Apply transformation to entire FlowFile content JSON"), + EACH_OBJECT("Each JSON Object", "Apply transformation each JSON Object in an array"); + + private final String displayName; + + private final String description; + + TransformationStrategy(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/java/org/apache/nifi/processors/jslt/TestJSLTTransformJSON.java b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/java/org/apache/nifi/processors/jslt/TestJSLTTransformJSON.java index 5a9f1d33dc..cdcf37e8df 100644 --- a/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/java/org/apache/nifi/processors/jslt/TestJSLTTransformJSON.java +++ b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/java/org/apache/nifi/processors/jslt/TestJSLTTransformJSON.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.ENTIRE_FLOWFILE; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestJSLTTransformJSON { @@ -75,6 +76,7 @@ public class TestJSLTTransformJSON { final String transformPath = transformUrl.getPath(); runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transformPath); + runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue()); runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString()); final String json = getResource("input.json"); @@ -104,6 +106,7 @@ public class TestJSLTTransformJSON { final String inputFlowFile = getResource("input.json"); final String transform = getResource("expressionLanguageTransform.json"); runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform); + runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue()); runner.assertValid(); runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString()); Map attrs = new HashMap<>(); @@ -121,11 +124,26 @@ public class TestJSLTTransformJSON { runTransform("inputArray.json", "arrayTransform.json", "arrayOutput.json"); } + @Test + public void testArrayJSLTPerObject() { + final String transform = getResource("arrayTransformPerObject.json"); + final String json = getResource("inputArray.json"); + runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform); + runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString()); + runner.enqueue(json); + + final MockFlowFile flowFile = assertRunSuccess(); + + final String expectedOutput = getResource("arrayOutput.json"); + assertContentEquals(flowFile, expectedOutput); + } + @Test public void testJSLTNoOutput() throws IOException { final String input = "{\"a\":1}"; final String transform = ".b"; runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform); + runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue()); runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString()); runner.enqueue(input); @@ -156,6 +174,7 @@ public class TestJSLTTransformJSON { final String transform = getResource(transformFileName); final String json = getResource(jsonFileName); runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform); + runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue()); runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString()); runner.enqueue(json); } diff --git a/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/resources/arrayTransformPerObject.json b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/resources/arrayTransformPerObject.json new file mode 100644 index 0000000000..c18274d7f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/test/resources/arrayTransformPerObject.json @@ -0,0 +1,8 @@ +{ + "SecondaryRatings": { + "quality": { + "Value": .rating.primary.value, + "RatingRange": .rating.quality.value + } + } +} \ No newline at end of file