NIFI-9286: JOLT Expression Language

Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification
NIFI-9286: adding JOLT unit tests
NIFI-9286: addressing PR feedback
Fixes a problem with the scope of the EL for module directory
NIFI-9286: alignment of JOLT processors
NIFI-9286: fix checkstyle

This closes #5444

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
levilentz 2021-10-05 14:46:13 -07:00 committed by Mike Thomsen
parent 7ef2fd2986
commit dfbf2e3cea
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
4 changed files with 123 additions and 33 deletions

View File

@ -20,14 +20,15 @@ import com.bazaarvoice.jolt.ContextualTransform;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils;
import com.bazaarvoice.jolt.Transform;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -60,7 +61,6 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -89,6 +89,7 @@ import java.util.stream.Collectors;
@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
@RequiresInstanceClassLoading
public class JoltTransformRecord extends AbstractProcessor {
static final AllowableValue SHIFTR
@ -151,8 +152,9 @@ public class JoltTransformRecord extends AbstractProcessor {
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@ -163,6 +165,7 @@ public class JoltTransformRecord extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
.dynamicallyModifiesClasspath(true)
.dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@ -200,7 +203,7 @@ public class JoltTransformRecord extends AbstractProcessor {
* For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
* when there is no jolt-record-spec specified).
*/
private LoadingCache<Optional<String>, JoltTransform> transformCache;
private Cache<Optional<String>, JoltTransform> transformCache;
static {
final List<PropertyDescriptor> _properties = new ArrayList<>();
@ -235,7 +238,6 @@ public class JoltTransformRecord extends AbstractProcessor {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
if (!SORTR.getValue().equals(transform)) {
final String message = "A specification is required for this transformation";
@ -247,7 +249,7 @@ public class JoltTransformRecord extends AbstractProcessor {
try {
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue)) {
if (validationContext.isExpressionLanguagePresent(specValue) ) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
@ -265,6 +267,14 @@ public class JoltTransformRecord extends AbstractProcessor {
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
} else if (validationContext.isExpressionLanguagePresent(customTransform)) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(CUSTOM_CLASS.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.build());
}
} else {
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson);
}
@ -273,7 +283,7 @@ public class JoltTransformRecord extends AbstractProcessor {
}
}
} catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString());
getLogger().info("Processor is not valid - ", e);
String message = "Specification not valid for the selected transformation.";
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
@ -443,7 +453,14 @@ public class JoltTransformRecord extends AbstractProcessor {
specString = Optional.empty();
}
return transformCache.get(specString);
return transformCache.get(specString, currString -> {
try {
return createTransform(context, currString.orElse(null), flowFile);
} catch (Exception e) {
getLogger().error("Problem getting transform", e);
}
return null;
});
}
@OnScheduled
@ -451,10 +468,10 @@ public class JoltTransformRecord extends AbstractProcessor {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
.build(specString -> createTransform(context, specString.orElse(null)));
.build();
}
private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@ -463,16 +480,12 @@ public class JoltTransformRecord extends AbstractProcessor {
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
} else {
return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
}
protected FilenameFilter getJarFilenameFilter() {
return (dir, name) -> (name != null && name.endsWith(".jar"));
}
protected static Object transform(JoltTransform joltTransform, Object input) {
return joltTransform instanceof ContextualTransform
? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input);

View File

@ -39,6 +39,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
@ -582,6 +583,33 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
new String(transformed.toByteArray()));
}
@Test
public void testExpressionLanguageJarFile() throws IOException {
generateTestData(1, null);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar");
assert t != null;
final String customJarPath = t.getPath();
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json")));
final String customJoltTransform = "TestCustomJoltTransform";
final String customClass = "TestCustomJoltTransform";
runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${JOLT_SPEC}");
runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "${CUSTOM_CLASS}");
runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
runner.setVariable("CUSTOM_JAR", customJarPath);
Map<String, String> customSpecs = new HashMap<>();
customSpecs.put("JOLT_SPEC", spec);
customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
customSpecs.put("CUSTOM_CLASS", customClass);
runner.enqueue(new byte[0], customSpecs);
runner.assertValid();
}
@Test
public void testJoltSpecEL() throws IOException {
generateTestData(1, null);

View File

@ -18,13 +18,14 @@ package org.apache.nifi.processors.standard;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
@RequiresInstanceClassLoading
public class JoltTransformJSON extends AbstractProcessor {
public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to create the output JSON.");
@ -109,8 +111,9 @@ public class JoltTransformJSON extends AbstractProcessor {
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(JOLT_SPEC, CUSTOMR)
.build();
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@ -119,7 +122,9 @@ public class JoltTransformJSON extends AbstractProcessor {
.description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@ -160,7 +165,7 @@ public class JoltTransformJSON extends AbstractProcessor {
* For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
* when there is no jolt-record-spec specified).
*/
private LoadingCache<Optional<String>, JoltTransform> transformCache;
private Cache<Optional<String>, JoltTransform> transformCache;
static {
final List<PropertyDescriptor> _properties = new ArrayList<>();
@ -188,8 +193,6 @@ public class JoltTransformJSON extends AbstractProcessor {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
@ -208,7 +211,7 @@ public class JoltTransformJSON extends AbstractProcessor {
final ClassLoader customClassLoader;
try {
if (modulePath != null) {
if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
@ -217,13 +220,21 @@ public class JoltTransformJSON extends AbstractProcessor {
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue)) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(JOLT_SPEC.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.build());
}
} else if (validationContext.isExpressionLanguagePresent(customTransform)) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(CUSTOM_CLASS.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.build());
}
} else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
@ -242,7 +253,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
} catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString());
getLogger().error("processor is not valid: ", e);
String message = "Specification not valid for the selected transformation." ;
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
@ -306,7 +317,7 @@ public class JoltTransformJSON extends AbstractProcessor {
logger.info("Transformed {}", new Object[]{original});
}
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
@ -314,7 +325,14 @@ public class JoltTransformJSON extends AbstractProcessor {
specString = Optional.empty();
}
return transformCache.get(specString);
return transformCache.get(specString, currString -> {
try {
return createTransform(context, currString.orElse(null), flowFile);
} catch (Exception e) {
getLogger().error("Problem getting transform", e);
}
return null;
});
}
@OnScheduled
@ -322,11 +340,15 @@ public class JoltTransformJSON extends AbstractProcessor {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
.build(specString -> createTransform(context, specString.orElse(null)));
.build();
try {
if (context.getProperty(MODULES).isSet()) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
customClassLoader = ClassLoaderUtils.getCustomClassLoader(
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
this.getClass().getClassLoader(),
getJarFilenameFilter()
);
} else {
customClassLoader = this.getClass().getClassLoader();
}
@ -335,7 +357,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@ -344,7 +366,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
} else {
return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}

View File

@ -24,6 +24,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -361,6 +362,32 @@ public class TestJoltTransformJSON {
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testExpressionLanguageJarFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
final String customJoltTransform = "TestCustomJoltTransform";
Map<String, String> customSpecs = new HashMap<>();
customSpecs.put("JOLT_SPEC", spec);
customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
runner.setProperty(JoltTransformJSON.JOLT_SPEC, "${JOLT_SPEC}");
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"${CUSTOM_JOLT_CLASS}");
runner.setProperty(JoltTransformJSON.MODULES, "${CUSTOM_JAR}");
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.setVariable("CUSTOM_JAR", customJarPath);
runner.enqueue(JSON_INPUT, customSpecs);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputWithCustomTransformationWithDir() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());