NIFI-4957 Add Resource File Support for Jolt Specifications

This closes #4044

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2020-02-10 13:02:49 -05:00 committed by exceptionfactory
parent 63452da617
commit 991e5e24de
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 277 additions and 115 deletions

View File

@ -34,9 +34,11 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -60,10 +62,15 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.io.BufferedReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -86,7 +93,7 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"),
})
@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created "
@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
@RequiresInstanceClassLoading
@ -141,9 +148,11 @@ public class JoltTransformRecord extends AbstractProcessor {
static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
.name("jolt-record-spec")
.displayName("Jolt Specification")
.description("Jolt Specification for transform of record data. This value is ignored if the Jolt Sort Transformation is selected.")
.description("Jolt Specification for transform of record data. The value for this property may be the text of a JOLT specification "
+ "or the path to a file containing a JOLT specification. This value is ignored if the Jolt Sort Transformation is selected.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.required(false)
.build();
@ -238,19 +247,26 @@ public class JoltTransformRecord extends AbstractProcessor {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
if (!SORTR.getValue().equals(transform)) {
final String message = "A specification is required for this transformation";
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
.build());
}
} else {
try {
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null;
final String joltSpecValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue) ) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (StringUtils.isEmpty(joltSpecValue) && !SORTR.getValue().equals(transform)) {
results.add(new ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation(
"'Jolt Specification' must be set, or the Transformation must be 'Sort'").build());
} else {
final ClassLoader customClassLoader;
try {
if (modulePath != null) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
}
final boolean elPresent = validationContext.isExpressionLanguagePresent(joltSpecValue);
if (elPresent) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(joltSpecValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(JOLT_SPEC.getDisplayName())
@ -258,28 +274,24 @@ public class JoltTransformRecord extends AbstractProcessor {
.build());
}
} else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (!SORTR.getValue().equals(transform)) {
if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
} else if (validationContext.isExpressionLanguagePresent(customTransform)) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
final String content = readTransform(validationContext.getProperty(JOLT_SPEC));
final Object specJson = JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.subject(CUSTOM_CLASS.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.explanation(customMessage)
.build());
} else {
TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
}
} else {
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson);
TransformFactory.getTransform(customClassLoader, transform, specJson);
}
} else {
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), transform, specJson);
}
}
} catch (final Exception e) {
@ -294,7 +306,6 @@ public class JoltTransformRecord extends AbstractProcessor {
return results;
}
@SuppressWarnings("unchecked")
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
@ -337,7 +348,7 @@ public class JoltTransformRecord extends AbstractProcessor {
}
transformed = session.putAllAttributes(transformed, attributes);
logger.info("{} had no Records to transform", new Object[]{original});
logger.info("{} had no Records to transform", original);
} else {
final JoltTransform transform = getTransform(context, original);
@ -375,9 +386,6 @@ public class JoltTransformRecord extends AbstractProcessor {
while ((record = reader.nextRecord()) != null) {
final List<Record> transformedRecords = transform(record, transform);
if (transformedRecords == null) {
throw new ProcessException("Error transforming the record");
}
for (Record transformedRecord : transformedRecords) {
writer.write(transformedRecord);
}
@ -388,7 +396,7 @@ public class JoltTransformRecord extends AbstractProcessor {
try {
writer.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Writer for {}", new Object[]{transformed});
getLogger().warn("Failed to close Writer for {}", transformed);
}
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
@ -399,10 +407,10 @@ public class JoltTransformRecord extends AbstractProcessor {
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAllAttributes(transformed, attributes);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.debug("Transformed {}", new Object[]{original});
logger.debug("Transform completed {}", original);
}
} catch (final Exception ex) {
logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString(), ex});
} catch (final Exception e) {
logger.error("Transform failed for {}", original, e);
session.transfer(original, REL_FAILURE);
if (transformed != null) {
session.remove(transformed);
@ -449,13 +457,15 @@ public class JoltTransformRecord extends AbstractProcessor {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
} else {
} else if (SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specString = Optional.empty();
} else {
throw new IllegalArgumentException("'Jolt Specification' must be set, or the Transformation must be Sort.");
}
return transformCache.get(specString, currString -> {
try {
return createTransform(context, currString.orElse(null), flowFile);
return createTransform(context, flowFile);
} catch (Exception e) {
getLogger().error("Problem getting transform", e);
}
@ -463,6 +473,27 @@ public class JoltTransformRecord extends AbstractProcessor {
});
}
private String readTransform(final PropertyValue propertyValue, final FlowFile flowFile) {
final String transform;
if (propertyValue.isExpressionLanguagePresent()) {
transform = propertyValue.evaluateAttributeExpressions(flowFile).getValue();
} else {
transform = readTransform(propertyValue);
}
return transform;
}
private String readTransform(final PropertyValue propertyValue) {
final ResourceReference resourceReference = propertyValue.asResource();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resourceReference.read()))) {
return reader.lines().collect(Collectors.joining());
} catch (final IOException e) {
throw new UncheckedIOException("Read JOLT Transform failed", e);
}
}
@OnScheduled
public void setup(final ProcessContext context) {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
@ -471,10 +502,11 @@ public class JoltTransformRecord extends AbstractProcessor {
.build();
}
private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
private JoltTransform createTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
if ((context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) {
final String resolvedSpec = readTransform(context.getProperty(JOLT_SPEC), flowFile);
specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET);
} else {
specJson = null;
}
@ -491,6 +523,10 @@ public class JoltTransformRecord extends AbstractProcessor {
? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input);
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> (name != null && name.endsWith(".jar"));
}
/**
* Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[].
*

View File

@ -102,6 +102,24 @@ public class TestJoltTransformRecord {
assertEquals(3, relationships.size());
}
@Test
public void testRelationshipsCreatedFromFile() throws IOException {
generateTestData(1, null);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String spec = "./src/test/resources/TestJoltTransformRecord/chainrSpec.json";
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.enqueue(new byte[0]);
Set<Relationship> relationships = processor.getRelationships();
assertTrue(relationships.contains(JoltTransformRecord.REL_FAILURE));
assertTrue(relationships.contains(JoltTransformRecord.REL_SUCCESS));
assertTrue(relationships.contains(JoltTransformRecord.REL_ORIGINAL));
assertEquals(3, relationships.size());
}
@Test
public void testInvalidJOLTSpec() throws IOException {
generateTestData(1, null);
@ -110,9 +128,14 @@ public class TestJoltTransformRecord {
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String spec = "[{}]";
String spec = "[{}]";
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.assertNotValid();
final String specLocation = "src/test/resources/TestJoltTransformRecord/chainrSpec.json";
spec = new String(Files.readAllBytes(Paths.get(specLocation)));
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.assertValid();
}
@Test
@ -277,7 +300,7 @@ public class TestJoltTransformRecord {
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -299,7 +322,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -332,7 +355,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
put("b", 2);
put("c", 3);
}});
final Object[] recordArray1 = new Object[] {record1, record2, record3};
final Object[] recordArray1 = new Object[]{record1, record2, record3};
parser.addRecord((Object) recordArray1);
final Record record4 = new MapRecord(xSchema, new HashMap<String, Object>() {{
@ -345,7 +368,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
put("b", 201);
put("c", 301);
}});
final Object[] recordArray2 = new Object[] {record4, record5};
final Object[] recordArray2 = new Object[]{record4, record5};
parser.addRecord((Object) recordArray2);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")));
@ -365,7 +388,28 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json"))),
new String(transformed.toByteArray()));
}
@Test
public void testTransformInputWithShiftrFromFile() throws IOException {
generateTestData(1, null);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String spec = "./src/test/resources/TestJoltTransformRecord/shiftrSpec.json";
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))),
new String(transformed.toByteArray()));
}
@Test
@ -382,7 +426,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
new String(transformed.toByteArray()));
@ -402,7 +446,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json"))),
new String(transformed.toByteArray()));
@ -423,7 +467,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json"))),
new String(transformed.toByteArray()));
@ -442,7 +486,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -465,7 +509,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json"))),
new String(transformed.toByteArray()));
@ -506,7 +550,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json"))),
new String(transformed.toByteArray()));
@ -526,7 +570,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json"))),
new String(transformed.toByteArray()));
@ -545,7 +589,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -570,7 +614,7 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -684,5 +728,4 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
recordGenerator.apply(numRecords, parser);
}
}
}

View File

@ -31,9 +31,11 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -44,7 +46,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.jolt.TransformFactory;
import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
@ -52,10 +53,12 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.io.BufferedReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -64,13 +67,14 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr","cardinality","sort"})
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute = "mime.type",description = "Always set to application/json")
@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
@ -100,9 +104,12 @@ public class JoltTransformJSON extends AbstractProcessor {
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
.name("jolt-spec")
.displayName("Jolt Specification")
.description("Jolt Specification for transform of JSON data. This value is ignored if the Jolt Sort Transformation is selected.")
.description("Jolt Specification for transformation of JSON data. The value for this property may be the text of a Jolt specification "
+ "or the path to a file containing a Jolt specification. 'Jolt Specification' must be set, or "
+ "the value is ignored if the Jolt Sort Transformation is selected.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT)
.required(false)
.build();
@ -130,7 +137,7 @@ public class JoltTransformJSON extends AbstractProcessor {
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Transform Cache Size")
.description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+ "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+ "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
@ -198,15 +205,12 @@ public class JoltTransformJSON extends AbstractProcessor {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null;
final String modulePath = validationContext.getProperty(MODULES).isSet() ? validationContext.getProperty(MODULES).getValue() : null;
final String joltSpecBody = validationContext.getProperty(JOLT_SPEC).getValue();
if(!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())){
if(!SORTR.getValue().equals(transform)) {
final String message = "A specification is required for this transformation";
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
.build());
}
if (StringUtils.isEmpty(joltSpecBody) && !SORTR.getValue().equals(transform)) {
results.add(new ValidationResult.Builder().subject(JOLT_SPEC.getDisplayName()).valid(false).explanation(
"'Jolt Specification' must be set, or the Transformation must be 'Sort'").build());
} else {
final ClassLoader customClassLoader;
@ -214,12 +218,14 @@ public class JoltTransformJSON extends AbstractProcessor {
if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
customClassLoader = this.getClass().getClassLoader();
}
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue)) {
final boolean elPresent = validationContext.isExpressionLanguagePresent(specValue);
if (elPresent) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
@ -236,26 +242,31 @@ public class JoltTransformJSON extends AbstractProcessor {
.build());
}
} else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (!SORTR.getValue().equals(transform)) {
if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
///for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
final String content = readTransform(validationContext.getProperty(JOLT_SPEC));
final Object specJson = JsonUtils.jsonToObject(content.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
} else {
TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
}
} else {
TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
TransformFactory.getTransform(customClassLoader, transform, specJson);
}
} else {
TransformFactory.getTransform(customClassLoader, transform, specJson);
}
}
} catch (final Exception e) {
getLogger().error("processor is not valid: ", e);
String message = "Specification not valid for the selected transformation." ;
results.add(new ValidationResult.Builder().valid(false)
String message = String.format("Specification not valid for the selected transformation: %s", e);
results.add(new ValidationResult.Builder()
.valid(false)
.subject(JOLT_SPEC.getDisplayName())
.explanation(message)
.build());
}
@ -278,7 +289,7 @@ public class JoltTransformJSON extends AbstractProcessor {
try (final InputStream in = session.read(original)) {
inputJson = JsonUtils.jsonToObject(in);
} catch (final Exception e) {
logger.error("Failed to transform {}; routing to failure", new Object[] {original, e});
logger.error("JSON parsing failed for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
@ -291,10 +302,10 @@ public class JoltTransformJSON extends AbstractProcessor {
Thread.currentThread().setContextClassLoader(customClassLoader);
}
final Object transformedJson = TransformUtils.transform(transform,inputJson);
final Object transformedJson = TransformUtils.transform(transform, inputJson);
jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? JsonUtils.toPrettyJsonString(transformedJson) : JsonUtils.toJsonString(transformedJson);
} catch (final Exception ex) {
logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex});
} catch (final Exception e) {
logger.error("Transform failed for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
} finally {
@ -303,33 +314,30 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
FlowFile transformed = session.write(original, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(jsonString.getBytes(DEFAULT_CHARSET));
}
});
FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(DEFAULT_CHARSET)));
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transform completed for {}", original);
}
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
} else {
} else if (SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specString = Optional.empty();
} else {
throw new IllegalArgumentException("'Jolt Specification' must be set, or the Transformation must be Sort.");
}
return transformCache.get(specString, currString -> {
try {
return createTransform(context, currString.orElse(null), flowFile);
return createTransform(context, flowFile);
} catch (Exception e) {
getLogger().error("Problem getting transform", e);
getLogger().error("Transform creation failed", e);
}
return null;
});
@ -352,15 +360,16 @@ public class JoltTransformJSON extends AbstractProcessor {
} else {
customClassLoader = this.getClass().getClassLoader();
}
} catch (final Exception ex) {
getLogger().error("Unable to setup processor", ex);
} catch (final Exception e) {
getLogger().error("ClassLoader configuration failed", e);
}
}
private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
private JoltTransform createTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
if ((context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))) {
final String resolvedSpec = readTransform(context.getProperty(JOLT_SPEC), flowFile);
specJson = JsonUtils.jsonToObject(resolvedSpec, DEFAULT_CHARSET);
} else {
specJson = null;
}
@ -372,8 +381,28 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> (name != null && name.endsWith(".jar"));
private String readTransform(final PropertyValue propertyValue, final FlowFile flowFile) {
final String transform;
if (propertyValue.isExpressionLanguagePresent()) {
transform = propertyValue.evaluateAttributeExpressions(flowFile).getValue();
} else {
transform = readTransform(propertyValue);
}
return transform;
}
private String readTransform(final PropertyValue propertyValue) {
final ResourceReference resourceReference = propertyValue.asResource();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(resourceReference.read()))) {
return reader.lines().collect(Collectors.joining());
} catch (final IOException e) {
throw new UncheckedIOException("Read JOLT Transform failed", e);
}
}
protected FilenameFilter getJarFilenameFilter() {
return (dir, name) -> (name != null && name.endsWith(".jar"));
}
}

View File

@ -59,11 +59,29 @@ public class TestJoltTransformJSON {
}
@Test
public void testInvalidJOLTSpec() {
public void testRelationshipsCreatedFromFile() throws IOException{
Processor processor= new JoltTransformJSON();
final TestRunner runner = TestRunners.newTestRunner(processor);
final String spec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.enqueue(JSON_INPUT);
Set<Relationship> relationships = processor.getRelationships();
assertTrue(relationships.contains(JoltTransformJSON.REL_FAILURE));
assertTrue(relationships.contains(JoltTransformJSON.REL_SUCCESS));
assertEquals(2, relationships.size());
}
@Test
public void testInvalidJOLTSpec() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String spec = "[{}]";
String spec = "[{}]";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.assertNotValid();
final String specLocation = "src/test/resources/TestJoltTransformJson/chainrSpec.json";
spec = new String(Files.readAllBytes(Paths.get(specLocation)));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.assertValid();
}
@Test
@ -76,7 +94,16 @@ public class TestJoltTransformJSON {
}
@Test
public void testSpecIsNotSet() {
public void testIncorrectJOLTSpecFromFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String chainrSpec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR);
runner.assertNotValid();
}
@Test
public void testSpecIsNotSet() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR);
runner.assertNotValid();
@ -118,6 +145,16 @@ public class TestJoltTransformJSON {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}
@Test
public void testInvalidFlowFileContentJsonFromFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String spec = "./src/test/resources/TestJoltTransformJson/chainrSpec.json";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.enqueue("invalid json");
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}
@Test
public void testCustomTransformationWithNoModule() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
@ -201,6 +238,23 @@ public class TestJoltTransformJSON {
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputWithShiftrFromFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String spec = "./src/test/resources/TestJoltTransformJson/shiftrSpec.json";
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/shiftrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputWithDefaultr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());