diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java index 0b4890149d..9293e4c23f 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java @@ -20,14 +20,15 @@ import com.bazaarvoice.jolt.ContextualTransform; import com.bazaarvoice.jolt.JoltTransform; import com.bazaarvoice.jolt.JsonUtils; import com.bazaarvoice.jolt.Transform; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -60,7 +61,6 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -89,6 +89,7 @@ import java.util.stream.Collectors; @CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If the transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") +@RequiresInstanceClassLoading public class JoltTransformRecord extends AbstractProcessor { static final AllowableValue SHIFTR @@ -151,8 +152,9 @@ public class JoltTransformRecord extends AbstractProcessor { .displayName("Custom Transformation Class Name") .description("Fully Qualified Class Name for Custom Transformation") .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(JOLT_SPEC, CUSTOMR) .build(); static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() @@ -163,6 +165,7 @@ public class JoltTransformRecord extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY) .dynamicallyModifiesClasspath(true) + .dependsOn(JOLT_SPEC, CUSTOMR) .build(); static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() @@ -200,7 +203,7 @@ public class JoltTransformRecord extends AbstractProcessor { * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform * when there is no jolt-record-spec specified). */ - private LoadingCache, JoltTransform> transformCache; + private Cache, JoltTransform> transformCache; static { final List _properties = new ArrayList<>(); @@ -235,7 +238,6 @@ public class JoltTransformRecord extends AbstractProcessor { final List results = new ArrayList<>(super.customValidate(validationContext)); final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue(); - if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) { if (!SORTR.getValue().equals(transform)) { final String message = "A specification is required for this transformation"; @@ -247,7 +249,7 @@ public class JoltTransformRecord extends AbstractProcessor { try { final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); - if (validationContext.isExpressionLanguagePresent(specValue)) { + if (validationContext.isExpressionLanguagePresent(specValue) ) { final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true); if (!StringUtils.isEmpty(invalidExpressionMsg)) { results.add(new ValidationResult.Builder().valid(false) @@ -265,6 +267,14 @@ public class JoltTransformRecord extends AbstractProcessor { results.add(new ValidationResult.Builder().valid(false) .explanation(customMessage) .build()); + } else if (validationContext.isExpressionLanguagePresent(customTransform)) { + final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true); + if (!StringUtils.isEmpty(invalidExpressionMsg)) { + results.add(new ValidationResult.Builder().valid(false) + .subject(CUSTOM_CLASS.getDisplayName()) + .explanation("Invalid Expression Language: " + invalidExpressionMsg) + .build()); + } } else { TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson); } @@ -273,7 +283,7 @@ public class JoltTransformRecord extends AbstractProcessor { } } } catch (final Exception e) { - getLogger().info("Processor is not valid - " + e.toString()); + getLogger().info("Processor is not valid - ", e); String message = "Specification not valid for the selected transformation."; results.add(new ValidationResult.Builder().valid(false) .explanation(message) @@ -443,7 +453,14 @@ public class JoltTransformRecord extends AbstractProcessor { specString = Optional.empty(); } - return transformCache.get(specString); + return transformCache.get(specString, currString -> { + try { + return createTransform(context, currString.orElse(null), flowFile); + } catch (Exception e) { + getLogger().error("Problem getting transform", e); + } + return null; + }); } @OnScheduled @@ -451,10 +468,10 @@ public class JoltTransformRecord extends AbstractProcessor { int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); transformCache = Caffeine.newBuilder() .maximumSize(maxTransformsToCache) - .build(specString -> createTransform(context, specString.orElse(null))); + .build(); } - private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception { + private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception { final Object specJson; if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); @@ -463,16 +480,12 @@ public class JoltTransformRecord extends AbstractProcessor { } if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); + return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson); } else { return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); } } - protected FilenameFilter getJarFilenameFilter() { - return (dir, name) -> (name != null && name.endsWith(".jar")); - } - protected static Object transform(JoltTransform joltTransform, Object input) { return joltTransform instanceof ContextualTransform ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java index 6c4188b997..288efe0692 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java @@ -39,6 +39,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; @@ -582,6 +583,33 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); new String(transformed.toByteArray())); } + @Test + public void testExpressionLanguageJarFile() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar"); + assert t != null; + final String customJarPath = t.getPath(); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json"))); + final String customJoltTransform = "TestCustomJoltTransform"; + final String customClass = "TestCustomJoltTransform"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${JOLT_SPEC}"); + runner.setProperty(JoltTransformRecord.MODULES, customJarPath); + runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "${CUSTOM_CLASS}"); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR); + runner.setVariable("CUSTOM_JAR", customJarPath); + Map customSpecs = new HashMap<>(); + customSpecs.put("JOLT_SPEC", spec); + customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform); + customSpecs.put("CUSTOM_CLASS", customClass); + runner.enqueue(new byte[0], customSpecs); + runner.assertValid(); + } + @Test public void testJoltSpecEL() throws IOException { generateTestData(1, null); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java index 7c7d649907..040a9bd36e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java @@ -18,13 +18,14 @@ package org.apache.nifi.processors.standard; import com.bazaarvoice.jolt.JoltTransform; import com.bazaarvoice.jolt.JsonUtils; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit; @CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") +@RequiresInstanceClassLoading public class JoltTransformJSON extends AbstractProcessor { public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to create the output JSON."); @@ -109,8 +111,9 @@ public class JoltTransformJSON extends AbstractProcessor { .displayName("Custom Transformation Class Name") .description("Fully Qualified Class Name for Custom Transformation") .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(JOLT_SPEC, CUSTOMR) .build(); public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() @@ -119,7 +122,9 @@ public class JoltTransformJSON extends AbstractProcessor { .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).") .required(false) .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamicallyModifiesClasspath(true) + .dependsOn(JOLT_SPEC, CUSTOMR) .build(); static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() @@ -160,7 +165,7 @@ public class JoltTransformJSON extends AbstractProcessor { * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform * when there is no jolt-record-spec specified). */ - private LoadingCache, JoltTransform> transformCache; + private Cache, JoltTransform> transformCache; static { final List _properties = new ArrayList<>(); @@ -188,8 +193,6 @@ public class JoltTransformJSON extends AbstractProcessor { return properties; } - - @Override protected Collection customValidate(ValidationContext validationContext) { final List results = new ArrayList<>(super.customValidate(validationContext)); @@ -208,7 +211,7 @@ public class JoltTransformJSON extends AbstractProcessor { final ClassLoader customClassLoader; try { - if (modulePath != null) { + if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) { customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter()); } else { customClassLoader = this.getClass().getClassLoader(); @@ -217,13 +220,21 @@ public class JoltTransformJSON extends AbstractProcessor { final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); if (validationContext.isExpressionLanguagePresent(specValue)) { - final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true); + final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true); if (!StringUtils.isEmpty(invalidExpressionMsg)) { results.add(new ValidationResult.Builder().valid(false) .subject(JOLT_SPEC.getDisplayName()) .explanation("Invalid Expression Language: " + invalidExpressionMsg) .build()); } + } else if (validationContext.isExpressionLanguagePresent(customTransform)) { + final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true); + if (!StringUtils.isEmpty(invalidExpressionMsg)) { + results.add(new ValidationResult.Builder().valid(false) + .subject(CUSTOM_CLASS.getDisplayName()) + .explanation("Invalid Expression Language: " + invalidExpressionMsg) + .build()); + } } else { //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET); @@ -242,7 +253,7 @@ public class JoltTransformJSON extends AbstractProcessor { } } } catch (final Exception e) { - getLogger().info("Processor is not valid - " + e.toString()); + getLogger().error("processor is not valid: ", e); String message = "Specification not valid for the selected transformation." ; results.add(new ValidationResult.Builder().valid(false) .explanation(message) @@ -306,7 +317,7 @@ public class JoltTransformJSON extends AbstractProcessor { logger.info("Transformed {}", new Object[]{original}); } - private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { + private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) { final Optional specString; if (context.getProperty(JOLT_SPEC).isSet()) { specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue()); @@ -314,7 +325,14 @@ public class JoltTransformJSON extends AbstractProcessor { specString = Optional.empty(); } - return transformCache.get(specString); + return transformCache.get(specString, currString -> { + try { + return createTransform(context, currString.orElse(null), flowFile); + } catch (Exception e) { + getLogger().error("Problem getting transform", e); + } + return null; + }); } @OnScheduled @@ -322,11 +340,15 @@ public class JoltTransformJSON extends AbstractProcessor { int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); transformCache = Caffeine.newBuilder() .maximumSize(maxTransformsToCache) - .build(specString -> createTransform(context, specString.orElse(null))); + .build(); try { if (context.getProperty(MODULES).isSet()) { - customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter()); + customClassLoader = ClassLoaderUtils.getCustomClassLoader( + context.getProperty(MODULES).evaluateAttributeExpressions().getValue(), + this.getClass().getClassLoader(), + getJarFilenameFilter() + ); } else { customClassLoader = this.getClass().getClassLoader(); } @@ -335,7 +357,7 @@ public class JoltTransformJSON extends AbstractProcessor { } } - private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception { + private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception { final Object specJson; if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); @@ -344,7 +366,7 @@ public class JoltTransformJSON extends AbstractProcessor { } if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); + return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson); } else { return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java index c54866240a..7e68c6b74a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java @@ -24,6 +24,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -361,6 +362,32 @@ public class TestJoltTransformJSON { assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); } + @Test + public void testExpressionLanguageJarFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); + final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar"; + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json"))); + final String customJoltTransform = "TestCustomJoltTransform"; + + Map customSpecs = new HashMap<>(); + customSpecs.put("JOLT_SPEC", spec); + customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform); + runner.setProperty(JoltTransformJSON.JOLT_SPEC, "${JOLT_SPEC}"); + runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"${CUSTOM_JOLT_CLASS}"); + runner.setProperty(JoltTransformJSON.MODULES, "${CUSTOM_JAR}"); + runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR); + runner.setVariable("CUSTOM_JAR", customJarPath); + runner.enqueue(JSON_INPUT, customSpecs); + runner.run(); + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json"); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + @Test public void testTransformInputWithCustomTransformationWithDir() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());