NIFI-10661 Added support for File Resources to JSLTTransformJSON

- Refactored unit test methods for reuse of shared operations
- Added link to JSLT Tutorial in property description

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6546
This commit is contained in:
exceptionfactory 2022-10-18 09:26:57 -05:00 committed by Matthew Burgess
parent 7d6dc2cace
commit b64b4fcce5
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 168 additions and 122 deletions

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.schibsted.spt.data.jslt.Expression;
import com.schibsted.spt.data.jslt.JsltException;
import com.schibsted.spt.data.jslt.Parser;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -37,30 +36,37 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@SideEffectFree
@SupportsBatching
@ -76,9 +82,11 @@ public class JSLTTransformJSON extends AbstractProcessor {
public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
.name("jslt-transform-transformation")
.displayName("JSLT Transformation")
.description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
.description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied. " +
"The JSLT Tutorial provides an overview of supported expressions: https://github.com/schibsted/jslt/blob/master/tutorial.md")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.TEXT, ResourceType.FILE)
.required(true)
.build();
@ -111,32 +119,32 @@ public class JSLTTransformJSON extends AbstractProcessor {
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
private static final ObjectMapper jsonObjectMapper = new ObjectMapper();
/**
* A cache for transform objects. It keeps values indexed by JSLT specification string.
*/
static {
descriptors = Collections.unmodifiableList(
Arrays.asList(
JSLT_TRANSFORM,
PRETTY_PRINT,
TRANSFORM_CACHE_SIZE
)
);
relationships = Collections.unmodifiableSet(new LinkedHashSet<>(
Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)
));
}
private Cache<String, Expression> transformCache;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(JSLT_TRANSFORM);
descriptors.add(PRETTY_PRINT);
descriptors.add(TRANSFORM_CACHE_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
return relationships;
}
@Override
@ -148,18 +156,23 @@ public class JSLTTransformJSON extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
// If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
try {
Parser.compileString(transform);
} catch (JsltException je) {
results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
}
final ValidationResult.Builder transformBuilder = new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName());
final PropertyValue transformProperty = validationContext.getProperty(JSLT_TRANSFORM);
if (transformProperty.isExpressionLanguagePresent()) {
transformBuilder.valid(true);
} else {
// Expression Language is present, we won't know if the transform is valid until the EL is evaluated
results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
try {
final String transform = readTransform(transformProperty);
Parser.compileString(transform);
transformBuilder.valid(true);
} catch (final RuntimeException e) {
final String explanation = String.format("JSLT Transform not valid: %s", e.getMessage());
transformBuilder.valid(false).explanation(explanation);
}
}
results.add(transformBuilder.build());
return results;
}
@ -171,12 +184,13 @@ public class JSLTTransformJSON extends AbstractProcessor {
.maximumSize(maxTransformsToCache)
.build();
// Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
final PropertyValue transformProperty = context.getProperty(JSLT_TRANSFORM);
if (!transformProperty.isExpressionLanguagePresent()) {
try {
final String transform = readTransform(transformProperty);
transformCache.put(transform, Parser.compileString(transform));
} catch (JsltException je) {
throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
} catch (final RuntimeException e) {
throw new ProcessException("JSLT Transform compilation failed", e);
}
}
}
@ -188,27 +202,28 @@ public class JSLTTransformJSON extends AbstractProcessor {
return;
}
final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
JsonNode firstJsonNode;
final JsonNode jsonNode;
try (final InputStream in = session.read(original)) {
firstJsonNode = readJson(in);
jsonNode = readJson(in);
} catch (final Exception e) {
logger.error("Failed to transform {}; routing to failure", original, e);
getLogger().error("JSLT Transform failed {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
try {
final String transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
final PropertyValue transformProperty = context.getProperty(JSLT_TRANSFORM);
final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
try {
final String transform = readTransform(transformProperty, original);
final Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
final JsonNode transformedJson = jsltExpression.apply(jsonNode);
final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
final Object outputObject;
if (transformedJson == null || transformedJson.isNull()) {
logger.warn("JSLT transform resulted in no data");
getLogger().warn("JSLT Transform resulted in no data {}", original);
outputObject = null;
} else {
outputObject = transformedJson;
@ -221,9 +236,9 @@ public class JSLTTransformJSON extends AbstractProcessor {
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.debug("Transformed {}", original);
} catch (final Exception ex) {
logger.error("JSLT Transform failed {}", original, ex);
getLogger().debug("JSLT Transform completed {}", original);
} catch (final Exception e) {
getLogger().error("JSLT Transform failed {}", original, e);
session.transfer(original, REL_FAILURE);
}
}
@ -241,4 +256,25 @@ public class JSLTTransformJSON extends AbstractProcessor {
throw new IOException("Could not parse data as JSON", e);
}
}
private String readTransform(final PropertyValue propertyValue, final FlowFile flowFile) {
final String transform;
if (propertyValue.isExpressionLanguagePresent()) {
transform = propertyValue.evaluateAttributeExpressions(flowFile).getValue();
} else {
transform = readTransform(propertyValue);
}
return transform;
}
private String readTransform(final PropertyValue propertyValue) {
final ResourceReference resourceReference = propertyValue.asResource();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resourceReference.read()))) {
return reader.lines().collect(Collectors.joining());
} catch (final IOException e) {
throw new UncheckedIOException("Read JSLT Transform failed", e);
}
}
}

View File

@ -22,17 +22,20 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class TestJSLTTransformJSON {
private final static Path JSON_INPUT = Paths.get("src/test/resources/input.json");
private TestRunner runner = TestRunners.newTestRunner(new JSLTTransformJSON());
@BeforeEach
@ -41,13 +44,15 @@ public class TestJSLTTransformJSON {
}
@Test
public void testBadInput() throws IOException {
public void testBadInput() {
final String inputFlowFile = "I am not JSON";
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/simpleTransform.json")));
final String transform = getResource("simpleTransform.json");
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(inputFlowFile);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 0);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 1);
}
@ -61,83 +66,55 @@ public class TestJSLTTransformJSON {
}
@Test
public void testSimpleJSLT() throws IOException {
final String inputFlowFile = new String(Files.readAllBytes(JSON_INPUT));
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/simpleTransform.json")));
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.enqueue(inputFlowFile);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/simpleOutput.json")));
flowFile.assertContentEquals(translateNewLines(expectedOutput));
public void testTransformFilePath() {
final URL transformUrl = Objects.requireNonNull(getClass().getResource("/simpleTransform.json"));
final String transformPath = transformUrl.getPath();
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transformPath);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
final String json = getResource("input.json");
runner.enqueue(json);
assertRunSuccess();
}
@Test
public void testTransform() throws IOException {
final String inputFlowFile = new String(Files.readAllBytes(JSON_INPUT));
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/dynamicKeyTransform.json")));
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.enqueue(inputFlowFile);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/dynamicKeyTransformOutput.json")));
flowFile.assertContentEquals(translateNewLines(expectedOutput));
public void testSimpleJSLT() {
runTransform("input.json", "simpleTransform.json", "simpleOutput.json");
}
@Test
public void testTransform() {
runTransform("input.json", "dynamicKeyTransform.json", "dynamicKeyTransformOutput.json");
}
// This test verifies the capability of JSLT to perform a "cardinality ONE" operation (i.e. get first element if array) like JOLT has
@Test
public void testCardinality() throws IOException {
final String inputFlowFile = new String(Files.readAllBytes(JSON_INPUT));
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/cardinalityTransform.json")));
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.enqueue(inputFlowFile);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/cardinalityOutput.json")));
flowFile.assertContentEquals(translateNewLines(expectedOutput));
public void testCardinality() {
runTransform("input.json", "cardinalityTransform.json", "cardinalityOutput.json");
}
@Test
public void testExpressionLanguageTransform() throws IOException {
final String inputFlowFile = new String(Files.readAllBytes(JSON_INPUT));
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/expressionLanguageTransform.json")));
public void testExpressionLanguageTransform() {
final String inputFlowFile = getResource("input.json");
final String transform = getResource("expressionLanguageTransform.json");
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.assertValid();
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
Map<String, String> attrs = new HashMap<>();
attrs.put("rating.range", "RatingRange");
attrs.put("rating.quality", ".rating.quality.value");
runner.enqueue(inputFlowFile, attrs);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/simpleOutput.json")));
flowFile.assertContentEquals(translateNewLines(expectedOutput));
final MockFlowFile flowFile = assertRunSuccess();
final String expectedOutput = getResource("simpleOutput.json");
flowFile.assertContentEquals(expectedOutput);
}
@Test
public void testArrayJSLT() throws IOException {
final String inputFlowFile = new String(Files.readAllBytes(Paths.get("src/test/resources/inputArray.json")));
final String transform = new String(Files.readAllBytes(Paths.get("src/test/resources/arrayTransform.json")));
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.enqueue(inputFlowFile);
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/arrayOutput.json")));
flowFile.assertContentEquals(translateNewLines(expectedOutput));
public void testArrayJSLT() {
runTransform("inputArray.json", "arrayTransform.json", "arrayOutput.json");
}
@Test
@ -145,22 +122,55 @@ public class TestJSLTTransformJSON {
final String input = "{\"a\":1}";
final String transform = ".b";
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, "true");
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(input);
final MockFlowFile flowFile = assertRunSuccess();
flowFile.assertContentEquals(new byte[0]);
}
private void runTransform(final String inputFileName, final String transformFileName, final String outputFileName) {
setTransformEnqueueJson(transformFileName, inputFileName);
final MockFlowFile flowFile = assertRunSuccess();
final String expectedOutput = getResource(outputFileName);
flowFile.assertContentEquals(expectedOutput);
}
private void setTransformEnqueueJson(final String transformFileName, final String jsonFileName) {
final String transform = getResource(transformFileName);
final String json = getResource(jsonFileName);
runner.setProperty(JSLTTransformJSON.JSLT_TRANSFORM, transform);
runner.setProperty(JSLTTransformJSON.PRETTY_PRINT, Boolean.TRUE.toString());
runner.enqueue(json);
}
private MockFlowFile assertRunSuccess() {
runner.run();
runner.assertTransferCount(JSLTTransformJSON.REL_SUCCESS, 1);
runner.assertTransferCount(JSLTTransformJSON.REL_FAILURE, 0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new byte[0]);
return runner.getFlowFilesForRelationship(JSLTTransformJSON.REL_SUCCESS).iterator().next();
}
private String getResource(final String fileName) {
final String path = String.format("/%s", fileName);
try (
final InputStream inputStream = Objects.requireNonNull(getClass().getResourceAsStream(path), "Resource not found");
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))
) {
final String resource = reader.lines().collect(Collectors.joining(System.lineSeparator()));
return translateNewLines(resource);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
/*
* Translate newlines (expected to be in *nix format to be in the codebase) to the system's line separator (to support Windows, e.g.)
*/
private String translateNewLines(final String text) {
final String lineSeparator = System.getProperty("line.separator");
final Pattern pattern = Pattern.compile("\n", Pattern.MULTILINE);
final String translated = pattern.matcher(text).replaceAll(lineSeparator);
return translated;
return pattern.matcher(text).replaceAll(System.lineSeparator());
}
}