NIFI-11830 Added Transformation Strategy property to JSLTTransformJSON

This closes #7509

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-07-20 18:55:28 -04:00 committed by exceptionfactory
parent dbb2d6905f
commit d201119f0d
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 159 additions and 24 deletions

View File

@ -16,7 +16,11 @@
*/
package org.apache.nifi.processors.jslt;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
@ -35,6 +39,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@ -68,6 +73,9 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.EACH_OBJECT;
import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.ENTIRE_FLOWFILE;
@SideEffectFree
@SupportsBatching
@Tags({"json", "jslt", "transform"})
@ -90,6 +98,15 @@ public class JSLTTransformJSON extends AbstractProcessor {
.required(true)
.build();
public static final PropertyDescriptor TRANSFORMATION_STRATEGY = new PropertyDescriptor.Builder()
.name("jslt-transform-transformation-strategy")
.displayName("Transformation Strategy")
.description("Whether to apply the JSLT transformation to the entire FlowFile contents or each JSON object in the root-level array")
.required(true)
.allowableValues(TransformationStrategy.class)
.defaultValue(EACH_OBJECT.getValue())
.build();
public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
.name("jslt-transform-pretty_print")
.displayName("Pretty Print")
@ -127,6 +144,7 @@ public class JSLTTransformJSON extends AbstractProcessor {
descriptors = Collections.unmodifiableList(
Arrays.asList(
JSLT_TRANSFORM,
TRANSFORMATION_STRATEGY,
PRETTY_PRINT,
TRANSFORM_CACHE_SIZE
)
@ -174,7 +192,6 @@ public class JSLTTransformJSON extends AbstractProcessor {
results.add(transformBuilder.build());
return results;
}
@OnScheduled
@ -202,40 +219,73 @@ public class JSLTTransformJSON extends AbstractProcessor {
return;
}
final TransformationStrategy transformationStrategy = TransformationStrategy.valueOf(context.getProperty(TRANSFORMATION_STRATEGY).getValue());
final StopWatch stopWatch = new StopWatch(true);
final JsonNode jsonNode;
try (final InputStream in = session.read(original)) {
jsonNode = readJson(in);
} catch (final Exception e) {
getLogger().error("JSLT Transform failed {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
final PropertyValue transformProperty = context.getProperty(JSLT_TRANSFORM);
FlowFile transformed;
final JsonFactory jsonFactory = new JsonFactory();
try {
final String transform = readTransform(transformProperty, original);
final Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean();
final JsonNode transformedJson = jsltExpression.apply(jsonNode);
final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
final Object outputObject;
if (transformedJson == null || transformedJson.isNull()) {
getLogger().warn("JSLT Transform resulted in no data {}", original);
outputObject = null;
} else {
outputObject = transformedJson;
}
FlowFile transformed = session.write(original, out -> {
if (outputObject != null) {
writer.writeValue(out, outputObject);
transformed = session.write(original, (inputStream, outputStream) -> {
boolean topLevelArray = false;
JsonParser jsonParser;
JsonNode firstJsonNode;
if (EACH_OBJECT.equals(transformationStrategy)) {
jsonParser = jsonFactory.createParser(inputStream);
jsonParser.setCodec(jsonObjectMapper);
JsonToken token = jsonParser.nextToken();
if (token == JsonToken.START_ARRAY) {
token = jsonParser.nextToken(); // advance to START_OBJECT token
topLevelArray = true;
}
if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
firstJsonNode = jsonParser.readValueAsTree();
} else {
firstJsonNode = null;
}
} else {
firstJsonNode = readJson(inputStream);
jsonParser = null; // This will not be used when applying the transform to the entire FlowFile
}
final ObjectWriter writer = prettyPrint ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
final JsonGenerator jsonGenerator = writer.createGenerator(outputStream);
Object outputObject;
JsonNode nextNode;
if (topLevelArray) {
jsonGenerator.writeStartArray();
}
nextNode = firstJsonNode;
do {
final JsonNode transformedJson = jsltExpression.apply(nextNode);
if (transformedJson == null || transformedJson.isNull()) {
getLogger().warn("JSLT Transform resulted in no data {}", original);
outputObject = null;
} else {
outputObject = transformedJson;
}
if (outputObject != null) {
jsonGenerator.writeObject(outputObject);
}
} while ((nextNode = getNextJsonNode(transformationStrategy, jsonParser)) != null);
if (topLevelArray) {
jsonGenerator.writeEndArray();
}
jsonGenerator.flush();
});
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
stopWatch.stop();
getLogger().debug("JSLT Transform completed {}", original);
} catch (final Exception e) {
getLogger().error("JSLT Transform failed {}", original, e);
@ -245,7 +295,7 @@ public class JSLTTransformJSON extends AbstractProcessor {
@OnStopped
@OnShutdown
public void onStopped(ProcessContext context) {
public void onStopped() {
transformCache.cleanUp();
}
@ -277,4 +327,62 @@ public class JSLTTransformJSON extends AbstractProcessor {
throw new UncheckedIOException("Read JSLT Transform failed", e);
}
}
}
protected JsonNode getNextJsonNode(final TransformationStrategy transformationStrategy, final JsonParser jsonParser) throws IOException {
if (ENTIRE_FLOWFILE.equals(transformationStrategy)) {
return null;
}
return getJsonNode(jsonParser);
}
private JsonNode getJsonNode(JsonParser jsonParser) throws IOException {
while (true) {
final JsonToken token = jsonParser.nextToken();
if (token == null) {
return null;
}
switch (token) {
case START_ARRAY:
case END_ARRAY:
case END_OBJECT:
break;
case START_OBJECT:
return jsonParser.readValueAsTree();
default:
throw new IOException("Expected to get a JSON Object but got a token of type " + token.name());
}
}
}
enum TransformationStrategy implements DescribedValue {
ENTIRE_FLOWFILE("Entire FlowFile", "Apply transformation to entire FlowFile content JSON"),
EACH_OBJECT("Each JSON Object", "Apply transformation each JSON Object in an array");
private final String displayName;
private final String description;
TransformationStrategy(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.nifi.processors.jslt.JSLTTransformJSON.TransformationStrategy.ENTIRE_FLOWFILE;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestJSLTTransformJSON {
@ -75,6 +76,7 @@ public class TestJSLTTransformJSON {
final String transformPath = transformUrl.getPath();
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transformPath);
runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue());
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
final String json = getResource("input.json");
@ -104,6 +106,7 @@ public class TestJSLTTransformJSON {
final String inputFlowFile = getResource("input.json");
final String transform = getResource("expressionLanguageTransform.json");
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue());
runner.assertValid();
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
Map<String, String> attrs = new HashMap<>();
@ -121,11 +124,26 @@ public class TestJSLTTransformJSON {
runTransform("inputArray.json", "arrayTransform.json", "arrayOutput.json");
}
@Test
public void testArrayJSLTPerObject() {
final String transform = getResource("arrayTransformPerObject.json");
final String json = getResource("inputArray.json");
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(json);
final MockFlowFile flowFile = assertRunSuccess();
final String expectedOutput = getResource("arrayOutput.json");
assertContentEquals(flowFile, expectedOutput);
}
@Test
public void testJSLTNoOutput() throws IOException {
final String input = "{\"a\":1}";
final String transform = ".b";
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue());
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(input);
@ -156,6 +174,7 @@ public class TestJSLTTransformJSON {
final String transform = getResource(transformFileName);
final String json = getResource(jsonFileName);
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.TRANSFORMATION_STRATEGY, ENTIRE_FLOWFILE.getValue());
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(json);
}

View File

@ -0,0 +1,8 @@
{
"SecondaryRatings": {
"quality": {
"Value": .rating.primary.value,
"RatingRange": .rating.quality.value
}
}
}