From 991e5e24dece9f1015ef56b15a152fbdeeaa78d8 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Mon, 10 Feb 2020 13:02:49 -0500 Subject: [PATCH] NIFI-4957 Add Resource File Support for Jolt Specifications This closes #4044 Signed-off-by: David Handermann --- .../jolt/record/JoltTransformRecord.java | 124 ++++++++++------ .../jolt/record/TestJoltTransformRecord.java | 73 ++++++++-- .../standard/JoltTransformJSON.java | 135 +++++++++++------- .../standard/TestJoltTransformJSON.java | 60 +++++++- 4 files changed, 277 insertions(+), 115 deletions(-) diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java index 9293e4c23f..0467cf1b2c 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java @@ -34,9 +34,11 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceReference; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -60,10 +62,15 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import java.io.BufferedReader; +import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -86,7 +93,7 @@ import java.util.stream.Collectors; @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"), }) -@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created " +@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If the transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") @RequiresInstanceClassLoading @@ -141,9 +148,11 @@ public class JoltTransformRecord extends AbstractProcessor { static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() .name("jolt-record-spec") .displayName("Jolt Specification") - .description("Jolt Specification for transform of record data. This value is ignored if the Jolt Sort Transformation is selected.") + .description("Jolt Specification for transform of record data. The value for this property may be the text of a JOLT specification " + + "or the path to a file containing a JOLT specification. This value is ignored if the Jolt Sort Transformation is selected.") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) .required(false) .build(); @@ -238,19 +247,26 @@ public class JoltTransformRecord extends AbstractProcessor { final List results = new ArrayList<>(super.customValidate(validationContext)); final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue(); - if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) { - if (!SORTR.getValue().equals(transform)) { - final String message = "A specification is required for this transformation"; - results.add(new ValidationResult.Builder().valid(false) - .explanation(message) - .build()); - } - } else { - try { - final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); + final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null; + final String joltSpecValue = validationContext.getProperty(JOLT_SPEC).getValue(); - if (validationContext.isExpressionLanguagePresent(specValue) ) { - final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true); + if (StringUtils.isEmpty(joltSpecValue) && !SORTR.getValue().equals(transform)) { + results.add(new ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation( + "'Jolt Specification' must be set, or the Transformation must be 'Sort'").build()); + } else { + final ClassLoader customClassLoader; + + try { + if (modulePath != null) { + customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter()); + } else { + customClassLoader = this.getClass().getClassLoader(); + } + + final boolean elPresent = validationContext.isExpressionLanguagePresent(joltSpecValue); + + if (elPresent) { + final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(joltSpecValue, true); if (!StringUtils.isEmpty(invalidExpressionMsg)) { results.add(new ValidationResult.Builder().valid(false) .subject(JOLT_SPEC.getDisplayName()) @@ -258,28 +274,24 @@ public class JoltTransformRecord extends AbstractProcessor { .build()); } } else { - //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet - Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET); + if (!SORTR.getValue().equals(transform)) { - if (CUSTOMR.getValue().equals(transform)) { - if (StringUtils.isEmpty(customTransform)) { - final String customMessage = "A custom transformation class should be provided. "; - results.add(new ValidationResult.Builder().valid(false) - .explanation(customMessage) - .build()); - } else if (validationContext.isExpressionLanguagePresent(customTransform)) { - final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true); - if (!StringUtils.isEmpty(invalidExpressionMsg)) { + //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet + final String content = readTransform(validationContext.getProperty(JOLT_SPEC)); + final Object specJson = JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET); + + if (CUSTOMR.getValue().equals(transform)) { + if (StringUtils.isEmpty(customTransform)) { + final String customMessage = "A custom transformation class should be provided. "; results.add(new ValidationResult.Builder().valid(false) - .subject(CUSTOM_CLASS.getDisplayName()) - .explanation("Invalid Expression Language: " + invalidExpressionMsg) + .explanation(customMessage) .build()); + } else { + TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson); } } else { - TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson); + TransformFactory.getTransform(customClassLoader, transform, specJson); } - } else { - TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), transform, specJson); } } } catch (final Exception e) { @@ -294,7 +306,6 @@ public class JoltTransformRecord extends AbstractProcessor { return results; } - @SuppressWarnings("unchecked") @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile original = session.get(); @@ -337,7 +348,7 @@ public class JoltTransformRecord extends AbstractProcessor { } transformed = session.putAllAttributes(transformed, attributes); - logger.info("{} had no Records to transform", new Object[]{original}); + logger.info("{} had no Records to transform", original); } else { final JoltTransform transform = getTransform(context, original); @@ -375,9 +386,6 @@ public class JoltTransformRecord extends AbstractProcessor { while ((record = reader.nextRecord()) != null) { final List transformedRecords = transform(record, transform); - if (transformedRecords == null) { - throw new ProcessException("Error transforming the record"); - } for (Record transformedRecord : transformedRecords) { writer.write(transformedRecord); } @@ -388,7 +396,7 @@ public class JoltTransformRecord extends AbstractProcessor { try { writer.close(); } catch (final IOException ioe) { - getLogger().warn("Failed to close Writer for {}", new Object[]{transformed}); + getLogger().warn("Failed to close Writer for {}", transformed); } attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); @@ -399,10 +407,10 @@ public class JoltTransformRecord extends AbstractProcessor { final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); transformed = session.putAllAttributes(transformed, attributes); session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.debug("Transformed {}", new Object[]{original}); + logger.debug("Transform completed {}", original); } - } catch (final Exception ex) { - logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString(), ex}); + } catch (final Exception e) { + logger.error("Transform failed for {}", original, e); session.transfer(original, REL_FAILURE); if (transformed != null) { session.remove(transformed); @@ -449,13 +457,15 @@ public class JoltTransformRecord extends AbstractProcessor { final Optional specString; if (context.getProperty(JOLT_SPEC).isSet()) { specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue()); - } else { + } else if (SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { specString = Optional.empty(); + } else { + throw new IllegalArgumentException("'Jolt Specification' must be set, or the Transformation must be Sort."); } return transformCache.get(specString, currString -> { try { - return createTransform(context, currString.orElse(null), flowFile); + return createTransform(context, flowFile); } catch (Exception e) { getLogger().error("Problem getting transform", e); } @@ -463,6 +473,27 @@ public class JoltTransformRecord extends AbstractProcessor { }); } + private String readTransform(final PropertyValue propertyValue, final FlowFile flowFile) { + final String transform; + + if (propertyValue.isExpressionLanguagePresent()) { + transform = propertyValue.evaluateAttributeExpressions(flowFile).getValue(); + } else { + transform = readTransform(propertyValue); + } + + return transform; + } + + private String readTransform(final PropertyValue propertyValue) { + final ResourceReference resourceReference = propertyValue.asResource(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resourceReference.read()))) { + return reader.lines().collect(Collectors.joining()); + } catch (final IOException e) { + throw new UncheckedIOException("Read JOLT Transform failed", e); + } + } + @OnScheduled public void setup(final ProcessContext context) { int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); @@ -471,10 +502,11 @@ public class JoltTransformRecord extends AbstractProcessor { .build(); } - private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception { + private JoltTransform createTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { final Object specJson; - if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); + if ((context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) { + final String resolvedSpec = readTransform(context.getProperty(JOLT_SPEC), flowFile); + specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET); } else { specJson = null; } @@ -491,6 +523,10 @@ public class JoltTransformRecord extends AbstractProcessor { ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); } + protected FilenameFilter getJarFilenameFilter(){ + return (dir, name) -> (name != null && name.endsWith(".jar")); + } + /** * Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[]. * diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java index bdbcbf61af..dd6b7af8c1 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java @@ -102,6 +102,24 @@ public class TestJoltTransformRecord { assertEquals(3, relationships.size()); } + @Test + public void testRelationshipsCreatedFromFile() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = "./src/test/resources/TestJoltTransformRecord/chainrSpec.json"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.enqueue(new byte[0]); + Set relationships = processor.getRelationships(); + assertTrue(relationships.contains(JoltTransformRecord.REL_FAILURE)); + assertTrue(relationships.contains(JoltTransformRecord.REL_SUCCESS)); + assertTrue(relationships.contains(JoltTransformRecord.REL_ORIGINAL)); + assertEquals(3, relationships.size()); + } + @Test public void testInvalidJOLTSpec() throws IOException { generateTestData(1, null); @@ -110,9 +128,14 @@ public class TestJoltTransformRecord { runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, "Pretty Print JSON", "true"); runner.enableControllerService(writer); - final String spec = "[{}]"; + String spec = "[{}]"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); runner.assertNotValid(); + + final String specLocation = "src/test/resources/TestJoltTransformRecord/chainrSpec.json"; + spec = new String(Files.readAllBytes(Paths.get(specLocation))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.assertValid(); } @Test @@ -277,7 +300,7 @@ public class TestJoltTransformRecord { runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); @@ -299,7 +322,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); @@ -332,7 +355,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); put("b", 2); put("c", 3); }}); - final Object[] recordArray1 = new Object[] {record1, record2, record3}; + final Object[] recordArray1 = new Object[]{record1, record2, record3}; parser.addRecord((Object) recordArray1); final Record record4 = new MapRecord(xSchema, new HashMap() {{ @@ -345,7 +368,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); put("b", 201); put("c", 301); }}); - final Object[] recordArray2 = new Object[] {record4, record5}; + final Object[] recordArray2 = new Object[]{record4, record5}; parser.addRecord((Object) recordArray2); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc"))); @@ -365,7 +388,28 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json"))), new String(transformed.toByteArray())); + } + @Test + public void testTransformInputWithShiftrFromFile() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = "./src/test/resources/TestJoltTransformRecord/shiftrSpec.json"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))), + new String(transformed.toByteArray())); } @Test @@ -382,7 +426,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))), new String(transformed.toByteArray())); @@ -402,7 +446,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json"))), new String(transformed.toByteArray())); @@ -423,7 +467,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json"))), new String(transformed.toByteArray())); @@ -442,7 +486,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); @@ -465,7 +509,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json"))), new String(transformed.toByteArray())); @@ -506,7 +550,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json"))), new String(transformed.toByteArray())); @@ -526,7 +570,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json"))), new String(transformed.toByteArray())); @@ -545,7 +589,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); @@ -570,7 +614,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); runner.enqueue(new byte[0]); runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); -runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); @@ -684,5 +728,4 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); recordGenerator.apply(numRecords, parser); } } - } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java index 040a9bd36e..7542cd4564 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java @@ -31,9 +31,11 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceReference; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -44,7 +46,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.jolt.TransformFactory; import org.apache.nifi.processors.standard.util.jolt.TransformUtils; @@ -52,10 +53,12 @@ import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import java.io.BufferedReader; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,13 +67,14 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @EventDriven @SideEffectFree @SupportsBatching -@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr","cardinality","sort"}) +@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@WritesAttribute(attribute = "mime.type",description = "Always set to application/json") +@WritesAttribute(attribute = "mime.type", description = "Always set to application/json") @CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") @@ -100,9 +104,12 @@ public class JoltTransformJSON extends AbstractProcessor { public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() .name("jolt-spec") .displayName("Jolt Specification") - .description("Jolt Specification for transform of JSON data. This value is ignored if the Jolt Sort Transformation is selected.") + .description("Jolt Specification for transformation of JSON data. The value for this property may be the text of a Jolt specification " + + "or the path to a file containing a Jolt specification. 'Jolt Specification' must be set, or " + + "the value is ignored if the Jolt Sort Transformation is selected.") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT) .required(false) .build(); @@ -130,7 +137,7 @@ public class JoltTransformJSON extends AbstractProcessor { static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() .name("Transform Cache Size") .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need " - + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.") + + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("1") @@ -198,15 +205,12 @@ public class JoltTransformJSON extends AbstractProcessor { final List results = new ArrayList<>(super.customValidate(validationContext)); final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue(); - final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null; + final String modulePath = validationContext.getProperty(MODULES).isSet() ? validationContext.getProperty(MODULES).getValue() : null; + final String joltSpecBody = validationContext.getProperty(JOLT_SPEC).getValue(); - if(!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())){ - if(!SORTR.getValue().equals(transform)) { - final String message = "A specification is required for this transformation"; - results.add(new ValidationResult.Builder().valid(false) - .explanation(message) - .build()); - } + if (StringUtils.isEmpty(joltSpecBody) && !SORTR.getValue().equals(transform)) { + results.add(new ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation( + "'Jolt Specification' must be set, or the Transformation must be 'Sort'").build()); } else { final ClassLoader customClassLoader; @@ -214,12 +218,14 @@ public class JoltTransformJSON extends AbstractProcessor { if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) { customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter()); } else { - customClassLoader = this.getClass().getClassLoader(); + customClassLoader = this.getClass().getClassLoader(); } - final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); + String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); - if (validationContext.isExpressionLanguagePresent(specValue)) { + final boolean elPresent = validationContext.isExpressionLanguagePresent(specValue); + + if (elPresent) { final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true); if (!StringUtils.isEmpty(invalidExpressionMsg)) { results.add(new ValidationResult.Builder().valid(false) @@ -236,26 +242,31 @@ public class JoltTransformJSON extends AbstractProcessor { .build()); } } else { - //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet - Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET); + if (!SORTR.getValue().equals(transform)) { - if (CUSTOMR.getValue().equals(transform)) { - if (StringUtils.isEmpty(customTransform)) { - final String customMessage = "A custom transformation class should be provided. "; - results.add(new ValidationResult.Builder().valid(false) - .explanation(customMessage) - .build()); + ///for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet + final String content = readTransform(validationContext.getProperty(JOLT_SPEC)); + final Object specJson = JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET); + + if (CUSTOMR.getValue().equals(transform)) { + if (StringUtils.isEmpty(customTransform)) { + final String customMessage = "A custom transformation class should be provided. "; + results.add(new ValidationResult.Builder().valid(false) + .explanation(customMessage) + .build()); + } else { + TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson); + } } else { - TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson); + TransformFactory.getTransform(customClassLoader, transform, specJson); } - } else { - TransformFactory.getTransform(customClassLoader, transform, specJson); } } } catch (final Exception e) { - getLogger().error("processor is not valid: ", e); - String message = "Specification not valid for the selected transformation." ; - results.add(new ValidationResult.Builder().valid(false) + String message = String.format("Specification not valid for the selected transformation: %s", e); + results.add(new ValidationResult.Builder() + .valid(false) + .subject(JOLT_SPEC.getDisplayName()) .explanation(message) .build()); } @@ -278,7 +289,7 @@ public class JoltTransformJSON extends AbstractProcessor { try (final InputStream in = session.read(original)) { inputJson = JsonUtils.jsonToObject(in); } catch (final Exception e) { - logger.error("Failed to transform {}; routing to failure", new Object[] {original, e}); + logger.error("JSON parsing failed for {}", original, e); session.transfer(original, REL_FAILURE); return; } @@ -291,10 +302,10 @@ public class JoltTransformJSON extends AbstractProcessor { Thread.currentThread().setContextClassLoader(customClassLoader); } - final Object transformedJson = TransformUtils.transform(transform,inputJson); + final Object transformedJson = TransformUtils.transform(transform, inputJson); jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? JsonUtils.toPrettyJsonString(transformedJson) : JsonUtils.toJsonString(transformedJson); - } catch (final Exception ex) { - logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex}); + } catch (final Exception e) { + logger.error("Transform failed for {}", original, e); session.transfer(original, REL_FAILURE); return; } finally { @@ -303,33 +314,30 @@ public class JoltTransformJSON extends AbstractProcessor { } } - FlowFile transformed = session.write(original, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(jsonString.getBytes(DEFAULT_CHARSET)); - } - }); + FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(DEFAULT_CHARSET))); final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(transformed, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.info("Transformed {}", new Object[]{original}); + session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("Transform completed for {}", original); } private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) { final Optional specString; if (context.getProperty(JOLT_SPEC).isSet()) { specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue()); - } else { + } else if (SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { specString = Optional.empty(); + } else { + throw new IllegalArgumentException("'Jolt Specification' must be set, or the Transformation must be Sort."); } return transformCache.get(specString, currString -> { try { - return createTransform(context, currString.orElse(null), flowFile); + return createTransform(context, flowFile); } catch (Exception e) { - getLogger().error("Problem getting transform", e); + getLogger().error("Transform creation failed", e); } return null; }); @@ -352,15 +360,16 @@ public class JoltTransformJSON extends AbstractProcessor { } else { customClassLoader = this.getClass().getClassLoader(); } - } catch (final Exception ex) { - getLogger().error("Unable to setup processor", ex); + } catch (final Exception e) { + getLogger().error("ClassLoader configuration failed", e); } } - private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception { + private JoltTransform createTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { final Object specJson; - if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); + if ((context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) { + final String resolvedSpec = readTransform(context.getProperty(JOLT_SPEC), flowFile); + specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET); } else { specJson = null; } @@ -372,8 +381,28 @@ public class JoltTransformJSON extends AbstractProcessor { } } - protected FilenameFilter getJarFilenameFilter(){ - return (dir, name) -> (name != null && name.endsWith(".jar")); + private String readTransform(final PropertyValue propertyValue, final FlowFile flowFile) { + final String transform; + + if (propertyValue.isExpressionLanguagePresent()) { + transform = propertyValue.evaluateAttributeExpressions(flowFile).getValue(); + } else { + transform = readTransform(propertyValue); + } + + return transform; } + private String readTransform(final PropertyValue propertyValue) { + final ResourceReference resourceReference = propertyValue.asResource(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resourceReference.read()))) { + return reader.lines().collect(Collectors.joining()); + } catch (final IOException e) { + throw new UncheckedIOException("Read JOLT Transform failed", e); + } + } + + protected FilenameFilter getJarFilenameFilter() { + return (dir, name) -> (name != null && name.endsWith(".jar")); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java index d79690c877..3a37d6c698 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java @@ -59,11 +59,29 @@ public class TestJoltTransformJSON { } @Test - public void testInvalidJOLTSpec() { + public void testRelationshipsCreatedFromFile() throws IOException{ + Processor processor= new JoltTransformJSON(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final String spec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json"; + runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); + runner.enqueue(JSON_INPUT); + Set relationships = processor.getRelationships(); + assertTrue(relationships.contains(JoltTransformJSON.REL_FAILURE)); + assertTrue(relationships.contains(JoltTransformJSON.REL_SUCCESS)); + assertEquals(2, relationships.size()); + } + + @Test + public void testInvalidJOLTSpec() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); - final String spec = "[{}]"; + String spec = "[{}]"; runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); runner.assertNotValid(); + + final String specLocation = "src/test/resources/TestJoltTransformJson/chainrSpec.json"; + spec = new String(Files.readAllBytes(Paths.get(specLocation))); + runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); + runner.assertValid(); } @Test @@ -76,7 +94,16 @@ public class TestJoltTransformJSON { } @Test - public void testSpecIsNotSet() { + public void testIncorrectJOLTSpecFromFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); + final String chainrSpec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json"; + runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpec); + runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecIsNotSet() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR); runner.assertNotValid(); @@ -118,6 +145,16 @@ public class TestJoltTransformJSON { runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE); } + @Test + public void testInvalidFlowFileContentJsonFromFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); + final String spec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json"; + runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); + runner.enqueue("invalid json"); + runner.run(); + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE); + } + @Test public void testCustomTransformationWithNoModule() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); @@ -201,6 +238,23 @@ public class TestJoltTransformJSON { assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); } + @Test + public void testTransformInputWithShiftrFromFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON()); + final String spec = "./src/test/resources/TestJoltTransformJson/shiftrSpec.json"; + runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec); + runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json"); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/shiftrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + @Test public void testTransformInputWithDefaultr() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());