NIFI-2020 - initial commit for custom transformation support

NIFI-2020 - updates to use lambdas/stream wherever possible and fix potential nullpointer issue.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #564
This commit is contained in:
Yolanda M. Davis 2016-06-22 22:26:16 -04:00 committed by Matt Burgess
parent 1da18a3f40
commit 048ba5366c
20 changed files with 717 additions and 39 deletions

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.file.classloader;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
public class ClassLoaderUtils {
public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
String[] modules = modulePath != null? modulePath.split(",") : null;
URL[] classpaths = getURLsForClasspath(modules,filenameFilter);
return createModuleClassLoader(classpaths,parentClassLoader);
}
protected static URL[] getURLsForClasspath(String[] modulePaths, FilenameFilter filenameFilter) throws MalformedURLException {
List<URL> additionalClasspath = new LinkedList<>();
if (modulePaths != null) {
for (String modulePathString : modulePaths) {
File modulePath = new File(modulePathString);
if (modulePath.exists()) {
additionalClasspath.add(modulePath.toURI().toURL());
if (modulePath.isDirectory()) {
File[] files = modulePath.listFiles(filenameFilter);
if (files != null) {
for (File jarFile : files) {
additionalClasspath.add(jarFile.toURI().toURL());
}
}
}
} else {
throw new MalformedURLException("Path specified does not exist");
}
}
}
return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
}
protected static ClassLoader createModuleClassLoader(URL[] modules,ClassLoader parentClassLoader) {
return new URLClassLoader(modules, parentClassLoader);
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.file.classloader;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestClassLoaderUtils {
@Test
public void testGetCustomClassLoader() throws MalformedURLException,ClassNotFoundException{
final String jarFilePath = "src/test/resources/TestClassLoaderUtils";
ClassLoader customClassLoader = ClassLoaderUtils.getCustomClassLoader(jarFilePath ,this.getClass().getClassLoader(), getJarFilenameFilter());
assertTrue(customClassLoader != null);
assertTrue(customClassLoader.loadClass("TestSuccess") != null);
}
@Test
public void testGetCustomClassLoaderNoPathSpecified() throws MalformedURLException{
final ClassLoader originalClassLoader = this.getClass().getClassLoader();
ClassLoader customClassLoader = ClassLoaderUtils.getCustomClassLoader(null,originalClassLoader, getJarFilenameFilter());
assertTrue(customClassLoader != null);
try{
customClassLoader.loadClass("TestSuccess");
}catch (ClassNotFoundException cex){
assertTrue(cex.getLocalizedMessage().equals("TestSuccess"));
return;
}
fail("exception did not occur, class should not be found");
}
@Test
public void testGetCustomClassLoaderWithInvalidPath() {
final String jarFilePath = "src/test/resources/FakeTestClassLoaderUtils/TestSuccess.jar";
try {
ClassLoaderUtils.getCustomClassLoader(jarFilePath, this.getClass().getClassLoader(), getJarFilenameFilter());
}catch(MalformedURLException mex){
assertTrue(mex.getLocalizedMessage().equals("Path specified does not exist"));
return;
}
fail("exception did not occur, path should not exist");
}
protected FilenameFilter getJarFilenameFilter(){
return new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return (name != null && name.endsWith(".jar"));
}
};
}
}

View File

@ -17,6 +17,9 @@
package org.apache.nifi.web.standard.api.transformjson;
import java.io.File;
import java.io.FilenameFilter;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@ -24,6 +27,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.processors.standard.util.TransformFactory;
import org.apache.nifi.web.standard.api.AbstractStandardResource;
import org.apache.nifi.web.standard.api.transformjson.dto.JoltSpecificationDTO;
@ -53,10 +57,8 @@ public class TransformJSONResource extends AbstractStandardResource {
@Path("/validate")
public Response validateSpec(JoltSpecificationDTO specificationDTO) {
Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
try {
TransformFactory.getTransform(specificationDTO.getTransform(), specJson);
getTransformation(specificationDTO);
}catch(final Exception e){
logger.error("Validation Failed - " + e.toString());
return Response.ok(new ValidationDTO(false,"Validation Failed - Please verify the provided specification.")).build();
@ -70,10 +72,8 @@ public class TransformJSONResource extends AbstractStandardResource {
@Path("/execute")
public Response executeSpec(JoltSpecificationDTO specificationDTO) {
Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
try {
Transform transform = TransformFactory.getTransform(specificationDTO.getTransform(), specJson);
Transform transform = getTransformation(specificationDTO);
Object inputJson = JsonUtils.jsonToObject(specificationDTO.getInput());
return Response.ok(JsonUtils.toJsonString(transform.transform(inputJson))).build();
@ -84,6 +84,37 @@ public class TransformJSONResource extends AbstractStandardResource {
}
protected Transform getTransformation(JoltSpecificationDTO specificationDTO) throws Exception{
Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
String transformName = specificationDTO.getTransform();
String modules = specificationDTO.getModules();
ClassLoader classLoader = null;
Transform transform ;
if(modules != null && !modules.isEmpty()){
classLoader = ClassLoaderUtils.getCustomClassLoader(specificationDTO.getModules(),this.getClass().getClassLoader(), getJarFilenameFilter());
} else{
classLoader = this.getClass().getClassLoader();
}
if(transformName.equals("jolt-transform-custom")) {
transform = TransformFactory.getCustomTransform(classLoader,specificationDTO.getCustomClass(), specJson);
}else{
transform = TransformFactory.getTransform(classLoader,specificationDTO.getTransform(), specJson);
}
return transform;
}
protected FilenameFilter getJarFilenameFilter(){
return new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return (name != null && name.endsWith(".jar"));
}
};
}
}

View File

@ -28,6 +28,8 @@ public class JoltSpecificationDTO implements Serializable{
private String transform;
private String specification;
private String input;
private String modules;
private String customClass;
public JoltSpecificationDTO() {
}
@ -60,4 +62,20 @@ public class JoltSpecificationDTO implements Serializable{
public void setInput(String input) {
this.input = input;
}
public String getModules() {
return modules;
}
public void setModules(String modules) {
this.modules = modules;
}
public String getCustomClass() {
return customClass;
}
public void setCustomClass(String customClass) {
this.customClass = customClass;
}
}

View File

@ -53,6 +53,20 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
details['descriptors']['jolt-transform']['defaultValue'] ;
};
$scope.getCustomClass = function(details){
if(details['properties']['jolt-custom-class'] != null && details['properties']['jolt-custom-class'] != "") {
return details['properties']['jolt-custom-class'];
}
else return '';
};
$scope.getCustomModules = function(details){
if(details['properties']['jolt-custom-modules'] != null && details['properties']['jolt-custom-modules'] != "") {
return details['properties']['jolt-custom-modules'];
}
else return '';
};
$scope.getTransformOptions = function(details){
return $scope.convertToArray(details['descriptors']['jolt-transform']['allowableValues']);
};
@ -61,6 +75,8 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
$scope.jsonSpec = $scope.getJsonSpec(details);
$scope.transform = $scope.getTransform(details);
$scope.transformOptions = $scope.getTransformOptions(details);
$scope.customClass = $scope.getCustomClass(details);
$scope.modules = $scope.getCustomModules(details);
};
$scope.populateScopeWithDetails(details.data);
@ -243,7 +259,9 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
return {
"transform": transform,
"specification" : jsonSpec,
"input" : jsonInput
"input" : jsonInput,
"customClass" : $scope.customClass,
"modules": $scope.modules
};
};

View File

@ -136,6 +136,82 @@ public class TestTransformJSONResource extends JerseyTest {
assertTrue(validation.isValid());
}
@Test
public void testValidateWithCustomSpec() {
final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
final Map<String,String> properties = new HashMap<>();
properties.put("jolt-transform","jolt-transform-custom");
final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
assertNotNull(validate);
assertTrue(validate.isValid());
}
@Test
public void testValidateWithCustomSpecEmptyModule() {
final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
final Map<String,String> properties = new HashMap<>();
properties.put("jolt-transform","jolt-transform-custom");
final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
assertNotNull(validate);
assertTrue(!validate.isValid());
}
@Test
public void testValidateWithCustomInvalidSpec() {
final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
final Map<String,String> properties = new HashMap<>();
properties.put("jolt-transform","jolt-transform-custom");
final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}");
joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
assertNotNull(validate);
assertTrue(!validate.isValid());
}
@Test
public void testExecuteWithValidCustomSpec() {
final Diffy diffy = new Diffy();
JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
String inputJson = "{\"rating\":{\"quality\":2,\"count\":1}}";
joltSpecificationDTO.setInput(inputJson);
joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
String responseString = client().resource(getBaseURI()).path("/standard/transformjson/execute").post(String.class, joltSpecificationDTO);
Object transformedJson = JsonUtils.jsonToObject(responseString);
Object compareJson = JsonUtils.jsonToObject("{\"rating\":{\"quality\":2,\"count\":1}, \"custom-id\": 4}");
assertNotNull(transformedJson);
assertTrue(diffy.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testExecuteWithValidCustomSpecEmptyModule() {
JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
String inputJson = "{\"rating\":{\"quality\":2,\"count\":1}}";
joltSpecificationDTO.setInput(inputJson);
joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
exception.expect(UniformInterfaceException.class);
client().resource(getBaseURI()).path("/standard/transformjson/execute").post(String.class, joltSpecificationDTO);
}
@Test
public void testExecuteWithInvalidSpec() {

View File

@ -1,6 +1,11 @@
nifi-standard-nar
Copyright 2014-2016 The Apache Software Foundation
This includes derived works from the Apache Software License V2 library Jolt (https://github.com/bazaarvoice/jolt)
Copyright 2013-2014 Bazaarvoice, Inc
The derived work is adapted from com.bazaarvoice.jolt.chainr.ChainrBuilder.java, com.bazaarvoice.jolt.chainr.spec.ChainrSpec.java, com.bazaarvoice.jolt.chainr.spec.ChainrEntry.java and can be found in the org.apache.nifi.processors.standard.util.TransformFactory.java class.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -316,6 +316,7 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestSplitText/6.txt</exclude>
<exclude>src/test/resources/TestJoltTransformJson/input.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/chainrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/customChainrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/chainrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/cardrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/cardrOutput.json</exclude>

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -51,6 +51,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.processors.standard.util.TransformFactory;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.StreamUtils;
@ -77,13 +78,14 @@ public class JoltTransformJSON extends AbstractProcessor {
public static final AllowableValue REMOVR = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output JSON.");
public static final AllowableValue CARDINALITY = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output JSON.");
public static final AllowableValue SORTR = new AllowableValue("jolt-transform-sort", "Sort", "Sort input json key values alphabetically. Any specification set is ignored.");
public static final AllowableValue CUSTOMR = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name");
public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
.name("jolt-transform")
.displayName("Jolt Transformation DSL")
.description("Specifies the Jolt Transformation that should be used with the provided specification.")
.required(true)
.allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR)
.allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR,CUSTOMR)
.defaultValue(CHAINR.getValue())
.build();
@ -95,6 +97,24 @@ public class JoltTransformJSON extends AbstractProcessor {
.required(false)
.build();
public static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder()
.name("jolt-custom-class")
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
.name("jolt-custom-modules")
.displayName("Custom Module Directory")
.description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
@ -107,12 +127,15 @@ public class JoltTransformJSON extends AbstractProcessor {
private final static List<PropertyDescriptor> properties;
private final static Set<Relationship> relationships;
private volatile Transform transform;
private volatile ClassLoader customClassLoader;
private final static String DEFAULT_CHARSET = "UTF-8";
static{
final List<PropertyDescriptor> _properties = new ArrayList<>();
_properties.add(JOLT_TRANSFORM);
_properties.add(CUSTOM_CLASS);
_properties.add(MODULES);
_properties.add(JOLT_SPEC);
properties = Collections.unmodifiableList(_properties);
@ -137,8 +160,10 @@ public class JoltTransformJSON extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null;
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null;
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null;
if(StringUtils.isEmpty(specValue)){
if(!SORTR.getValue().equals(transform)) {
@ -147,10 +172,35 @@ public class JoltTransformJSON extends AbstractProcessor {
.explanation(message)
.build());
}
} else {
final ClassLoader customClassLoader;
try {
if(modulePath != null) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
}else{
customClassLoader = this.getClass().getClassLoader();
}
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue, DEFAULT_CHARSET);
TransformFactory.getTransform(transform, specJson);
if(CUSTOMR.getValue().equals(transform)){
if (StringUtils.isEmpty(customTransform)){
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
}else{
TransformFactory.getCustomTransform(customClassLoader,customTransform, specJson);
}
}else {
TransformFactory.getTransform(customClassLoader, transform, specJson);
}
} catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString());
String message = "Specification not valid for the selected transformation." ;
@ -182,8 +232,14 @@ public class JoltTransformJSON extends AbstractProcessor {
});
final String jsonString;
final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
if(customClassLoader != null) {
Thread.currentThread().setContextClassLoader(customClassLoader);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent);
final Object inputJson = JsonUtils.jsonToObject(bais);
final Object transformedJson = transform.transform(inputJson);
@ -193,6 +249,11 @@ public class JoltTransformJSON extends AbstractProcessor {
logger.error("Unable to transform {} due to {}", new Object[]{original, re});
session.transfer(original, REL_FAILURE);
return;
}finally {
if(customClassLoader != null && originalContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
}
FlowFile transformed = session.write(original, new OutputStreamCallback() {
@ -212,12 +273,34 @@ public class JoltTransformJSON extends AbstractProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
Object specJson = null;
if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET);
try{
Object specJson = null;
if(context.getProperty(MODULES).isSet()){
customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(),this.getClass().getClassLoader(),getJarFilenameFilter());
}else{
customClassLoader = this.getClass().getClassLoader();
}
if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET);
}
if(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
transform = TransformFactory.getCustomTransform(customClassLoader,context.getProperty(CUSTOM_CLASS).getValue(), specJson);
}else {
transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
} catch (Exception ex){
getLogger().error("Unable to setup processor",ex);
}
transform = TransformFactory.getTransform(context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> (name != null && name.endsWith(".jar"));
}
}

View File

@ -28,6 +28,9 @@
The Jolt utilities processing JSON are not not stream based therefore large JSON document
transformation may consume large amounts of memory. Currently UTF-8 FlowFile content and Jolt specifications are supported.
Custom Jolt Transformations (that implement the Transform interface) are supported. Modules containing custom libraries which do not
existing on the current class path can be included via the custom module directory property.
<Strong>Note:</Strong> When configuring a processor if user selects of the Default transformation yet provides a
Chain specification the system does not alert that the specification is invalid and and will produce failed flow files.
This is a known issue identified within the Jolt library.

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.bazaarvoice.jolt.Chainr;
import com.bazaarvoice.jolt.SpecDriven;
import com.bazaarvoice.jolt.Transform;
public class TestCustomJoltTransform implements SpecDriven,Transform {
final private Transform customTransform;
public TestCustomJoltTransform(Object specJson) {
this.customTransform = Chainr.fromSpec(specJson);
}
@Override
public Object transform(Object o) {
return customTransform.transform(o);
}
public static void main(String[] args) {
System.out.println("This is a Test Custom Transform");
}
}

View File

@ -115,6 +115,56 @@ public class TestJoltTransformJSON {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}
@Test
public void testCustomTransformationWithNoModule() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/customChainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS, "TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
}
@Test
public void testCustomTransformationWithMissingClassName() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.assertNotValid();
}
@Test
public void testCustomTransformationWithInvalidClassPath() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/FakeCustomJar.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.assertNotValid();
}
@Test
public void testCustomTransformationWithInvalidClassName() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"FakeCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.assertNotValid();
}
@Test
public void testTransformInputWithChainr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
@ -228,5 +278,83 @@ public class TestJoltTransformJSON {
assertTrue(compareJsonString.equals(transformedJsonString));
}
@Test
public void testTransformInputWithCustomTransformationWithJar() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputWithCustomTransformationWithDir() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputWithChainrEmbeddedCustomTransformation() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/customChainrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC,spec);
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testTransformInputCustomTransformationIgnored() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/defaultrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.DEFAULTR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/defaultrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
}

View File

@ -0,0 +1,35 @@
[
{
"operation":"TestCustomJoltTransform",
"spec" :
[
{
"operation": "shift",
"spec": {
"rating": {
"primary": {
"value": "Rating",
"max": "RatingRange"
},
"*": {
"max": "SecondaryRatings.&1.Range",
"value": "SecondaryRatings.&1.Value",
"$": "SecondaryRatings.&1.Id"
}
}
}
},
{
"operation": "default",
"spec": {
"Range": 5,
"SecondaryRatings": {
"*": {
"Range": 5
}
}
}
}
]
}
]

View File

@ -22,6 +22,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-standard-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.bazaarvoice.jolt</groupId>
<artifactId>jolt-core</artifactId>

View File

@ -17,30 +17,99 @@
package org.apache.nifi.processors.standard.util;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.bazaarvoice.jolt.CardinalityTransform;
import com.bazaarvoice.jolt.Chainr;
import com.bazaarvoice.jolt.Defaultr;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.Removr;
import com.bazaarvoice.jolt.Shiftr;
import com.bazaarvoice.jolt.Sortr;
import com.bazaarvoice.jolt.SpecDriven;
import com.bazaarvoice.jolt.Transform;
import com.bazaarvoice.jolt.chainr.spec.ChainrEntry;
import com.bazaarvoice.jolt.exception.SpecException;
public class TransformFactory {
public static Transform getTransform(String transform, Object specJson) {
if (transform.equals("jolt-transform-default")) {
public static Transform getTransform(final ClassLoader classLoader,final String transformType, final Object specJson) throws Exception {
if (transformType.equals("jolt-transform-default")) {
return new Defaultr(specJson);
} else if (transform.equals("jolt-transform-shift")) {
} else if (transformType.equals("jolt-transform-shift")) {
return new Shiftr(specJson);
} else if (transform.equals("jolt-transform-remove")) {
} else if (transformType.equals("jolt-transform-remove")) {
return new Removr(specJson);
} else if (transform.equals("jolt-transform-card")) {
} else if (transformType.equals("jolt-transform-card")) {
return new CardinalityTransform(specJson);
} else if(transform.equals("jolt-transform-sort")){
} else if(transformType.equals("jolt-transform-sort")){
return new Sortr();
} else {
return Chainr.fromSpec(specJson);
} else{
return new Chainr(getChainrJoltTransformations(classLoader,specJson));
}
}
@SuppressWarnings("unchecked")
public static Transform getCustomTransform(final ClassLoader classLoader, final String customTransformType, final Object specJson) throws Exception {
final Class clazz = classLoader.loadClass(customTransformType);
if(SpecDriven.class.isAssignableFrom(clazz)){
final Constructor constructor = clazz.getConstructor(Object.class);
return (Transform) constructor.newInstance(specJson);
}else{
return (Transform) clazz.newInstance();
}
}
protected static List<JoltTransform> getChainrJoltTransformations(ClassLoader classLoader, Object specJson) throws Exception{
if(!(specJson instanceof List)) {
throw new SpecException("JOLT Chainr expects a JSON array of objects - Malformed spec.");
} else {
List operations = (List)specJson;
if(operations.isEmpty()) {
throw new SpecException("JOLT Chainr passed an empty JSON array.");
} else {
ArrayList<JoltTransform> entries = new ArrayList<JoltTransform>(operations.size());
for(Object chainrEntryObj : operations) {
if(!(chainrEntryObj instanceof Map)) {
throw new SpecException("JOLT ChainrEntry expects a JSON map - Malformed spec");
} else {
Map chainrEntryMap = (Map)chainrEntryObj;
String opString = (String) chainrEntryMap.get("operation");
String operationClassName;
if(opString == null) {
throw new SpecException("JOLT Chainr \'operation\' must implement Transform or ContextualTransform");
} else {
if(ChainrEntry.STOCK_TRANSFORMS.containsKey(opString)) {
operationClassName = ChainrEntry.STOCK_TRANSFORMS.get(opString);
} else {
operationClassName = opString;
}
entries.add(getCustomTransform(classLoader,operationClassName,chainrEntryMap.get("spec")));
}
}
}
return entries;
}
}
}
}

View File

@ -17,8 +17,10 @@
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
@ -38,52 +40,76 @@ public class TestTransformFactory {
@Test
public void testGetChainTransform() throws IOException{
public void testGetChainTransform() throws Exception{
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
Transform transform = TransformFactory.getTransform("jolt-transform-chain",JsonUtils.jsonToObject(chainrSpec));
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain",JsonUtils.jsonToObject(chainrSpec));
assertTrue(transform instanceof Chainr);
}
@Test
public void testGetDefaultTransform() throws IOException{
public void testGetDefaultTransform() throws Exception{
final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/defaultrSpec.json")));
Transform transform = TransformFactory.getTransform("jolt-transform-default",JsonUtils.jsonToObject(defaultrSpec));
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-default",JsonUtils.jsonToObject(defaultrSpec));
assertTrue(transform instanceof Defaultr);
}
@Test
public void testGetSortTransform() throws IOException{
Transform transform = TransformFactory.getTransform("jolt-transform-sort",null);
public void testGetSortTransform() throws Exception{
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-sort",null);
assertTrue(transform instanceof Sortr);
}
@Test
public void testGetShiftTransform() throws IOException{
public void testGetShiftTransform() throws Exception{
final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/shiftrSpec.json")));
Transform transform = TransformFactory.getTransform("jolt-transform-shift",JsonUtils.jsonToObject(shiftrSpec));
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-shift",JsonUtils.jsonToObject(shiftrSpec));
assertTrue(transform instanceof Shiftr);
}
@Test
public void testGetRemoveTransform() throws IOException{
public void testGetRemoveTransform() throws Exception{
final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/removrSpec.json")));
Transform transform = TransformFactory.getTransform("jolt-transform-remove",JsonUtils.jsonToObject(removrSpec));
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-remove",JsonUtils.jsonToObject(removrSpec));
assertTrue(transform instanceof Removr);
}
@Test
public void testGetCardinalityTransform() throws IOException{
public void testGetCardinalityTransform() throws Exception{
final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/cardrSpec.json")));
Transform transform = TransformFactory.getTransform("jolt-transform-card",JsonUtils.jsonToObject(cardrSpec));
Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-card",JsonUtils.jsonToObject(cardrSpec));
assertTrue(transform instanceof CardinalityTransform);
}
@Test
public void testGetInvalidTransformWithNoSpec() {
try{
TransformFactory.getTransform("jolt-transform-chain",null);
TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain",null);
}catch (Exception e){
assertTrue(e.toString().equals("JOLT Chainr expects a JSON array of objects - Malformed spec."));
assertTrue(e.getLocalizedMessage().equals("JOLT Chainr expects a JSON array of objects - Malformed spec."));
}
}
@Test
public void testGetCustomTransformation() throws Exception{
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
Path jarFilePath = Paths.get("src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar");
URL[] urlPaths = new URL[1];
urlPaths[0] = jarFilePath.toUri().toURL();
ClassLoader customClassLoader = new URLClassLoader(urlPaths,this.getClass().getClassLoader());
Transform transform = TransformFactory.getCustomTransform(customClassLoader,"TestCustomJoltTransform",JsonUtils.jsonToObject(chainrSpec));
assertTrue(transform != null);
assertTrue(transform.getClass().getName().equals("TestCustomJoltTransform"));
}
@Test
public void testGetCustomTransformationNotFound() throws Exception{
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
try {
TransformFactory.getCustomTransform(this.getClass().getClassLoader(), "TestCustomJoltTransform", chainrSpec);
}catch (ClassNotFoundException cnf){
assertTrue(cnf.getLocalizedMessage().equals("TestCustomJoltTransform"));
}
}
}