mirror of https://github.com/apache/nifi.git
NIFI-361 - Create Processors to mutate JSON data
Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #354
This commit is contained in:
parent
e3bdee8b1e
commit
defb6f5b61
|
@ -1342,3 +1342,20 @@ For details see https://github.com/svenkubiak/jBCrypt/blob/0.4.1/LICENSE
|
|||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
This product bundles 'Jolt' which is available under an Apache License.
|
||||
For details see https://github.com/bazaarvoice/jolt/blob/jolt-0.0.20/LICENSE
|
||||
|
||||
Copyright 2013-2014 Bazaarvoice, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -602,6 +602,10 @@ The following binary components are provided under the Apache Software License v
|
|||
The following NOTICE information applies:
|
||||
Copyright 2011 JsonPath authors
|
||||
|
||||
(ASLv2) Jolt
|
||||
The following NOTICE information applies:
|
||||
Copyright 2013-2014 Bazaarvoice, Inc
|
||||
|
||||
(ASLv2) Kite SDK
|
||||
The following NOTICE information applies:
|
||||
This product includes software developed by Cloudera, Inc.
|
||||
|
|
|
@ -285,6 +285,7 @@ categorizing them by their functions.
|
|||
- *EncryptContent*: Encrypt or Decrypt Content
|
||||
- *ReplaceText*: Use Regular Expressions to modify textual Content
|
||||
- *TransformXml*: Apply an XSLT transform to XML Content
|
||||
- *TransformJSON*: Apply a JOLT specification to transform JSON Content
|
||||
|
||||
=== Routing and Mediation
|
||||
- *ControlRate*: Throttle the rate at which data can flow through one part of the flow
|
||||
|
|
|
@ -335,3 +335,19 @@ The binary distribution of this product bundles 'jBCrypt' which is available und
|
|||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
This product bundles 'Jolt' which is available under an Apache License. For details see https://github.com/bazaarvoice/jolt/blob/jolt-0.0.20/LICENSE
|
||||
|
||||
Copyright 2013-2014 Bazaarvoice, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -92,6 +92,10 @@ The following binary components are provided under the Apache Software License v
|
|||
The following NOTICE information applies:
|
||||
Copyright 2011 JsonPath authors
|
||||
|
||||
(ASLv2) Jolt
|
||||
The following NOTICE information applies:
|
||||
Copyright 2013-2014 Bazaarvoice, Inc
|
||||
|
||||
(ASLv2) Apache Commons Codec
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Codec
|
||||
|
|
|
@ -225,6 +225,16 @@ language governing permissions and limitations under the License. -->
|
|||
<version>1.4.187</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.bazaarvoice.jolt</groupId>
|
||||
<artifactId>jolt-core</artifactId>
|
||||
<version>0.0.20</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.bazaarvoice.jolt</groupId>
|
||||
<artifactId>json-utils</artifactId>
|
||||
<version>0.0.20</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -299,6 +309,20 @@ language governing permissions and limitations under the License. -->
|
|||
<exclude>src/test/resources/TestSplitText/4.txt</exclude>
|
||||
<exclude>src/test/resources/TestSplitText/5.txt</exclude>
|
||||
<exclude>src/test/resources/TestSplitText/6.txt</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/input.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/chainrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/chainrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/cardrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/cardrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/defaultrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/defaultrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/shiftrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/sortrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/shiftrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/removrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/removrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/defaultrSpec.json</exclude>
|
||||
<exclude>src/test/resources/TestTransformJson/defaultrOutput.json</exclude>
|
||||
<exclude>src/test/resources/TestSplitText/original.txt</exclude>
|
||||
<exclude>src/test/resources/TestTransformXml/math.html</exclude>
|
||||
<exclude>src/test/resources/TestTransformXml/tokens.csv</exclude>
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import com.bazaarvoice.jolt.CardinalityTransform;
|
||||
import com.bazaarvoice.jolt.Shiftr;
|
||||
import com.bazaarvoice.jolt.Removr;
|
||||
import com.bazaarvoice.jolt.Chainr;
|
||||
import com.bazaarvoice.jolt.Defaultr;
|
||||
import com.bazaarvoice.jolt.Sortr;
|
||||
import com.bazaarvoice.jolt.Transform;
|
||||
import com.bazaarvoice.jolt.JsonUtils;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr","cardinality","sort"})
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@WritesAttribute(attribute = "mime.type",description = "Always set to application/json")
|
||||
@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created "
|
||||
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
|
||||
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
|
||||
public class TransformJSON extends AbstractProcessor {
|
||||
|
||||
public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift Transform DSL", "Shift input JSON/data to create the output JSON.");
|
||||
public static final AllowableValue CHAINR = new AllowableValue("jolt-transform-chain", "Chain Transform DSL", "Execute list of JOLT transformations.");
|
||||
public static final AllowableValue DEFAULTR = new AllowableValue("jolt-transform-default", "Default Transform DSL", " Apply default values to the output JSON.");
|
||||
public static final AllowableValue REMOVR = new AllowableValue("jolt-transform-remove", "Remove Transform DSL", " Remove values from input data to create the output JSON.");
|
||||
public static final AllowableValue CARDINALITY = new AllowableValue("jolt-transform-card", "Cardinality Transform DSL", "Change the cardinality of input elements to create the output JSON.");
|
||||
public static final AllowableValue SORTR = new AllowableValue("jolt-transform-sort", "Sort Transform DSL", "Sort input json key values alphabetically. Any specification set is ignored.");
|
||||
|
||||
public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
|
||||
.name("jolt-transform")
|
||||
.displayName("Jolt Transformation")
|
||||
.description("Specifies the Jolt Transformation that should be used with the provided specification.")
|
||||
.required(true)
|
||||
.allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR)
|
||||
.defaultValue(CHAINR.getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
|
||||
.name("jolt-spec")
|
||||
.displayName("Jolt Specification")
|
||||
.description("Jolt Specification for transform of JSON data. This value is ignored if the Jolt Sort Transformation is selected.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("The FlowFile with transformed content will be routed to this relationship")
|
||||
.build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
|
||||
.build();
|
||||
|
||||
private final static List<PropertyDescriptor> properties;
|
||||
private final static Set<Relationship> relationships;
|
||||
private volatile Transform transform;
|
||||
private final static String DEFAULT_CHARSET = "UTF-8";
|
||||
|
||||
static{
|
||||
|
||||
final List<PropertyDescriptor> _properties = new ArrayList<>();
|
||||
_properties.add(JOLT_TRANSFORM);
|
||||
_properties.add(JOLT_SPEC);
|
||||
properties = Collections.unmodifiableList(_properties);
|
||||
|
||||
final Set<Relationship> _relationships = new HashSet<>();
|
||||
_relationships.add(REL_SUCCESS);
|
||||
_relationships.add(REL_FAILURE);
|
||||
relationships = Collections.unmodifiableSet(_relationships);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
|
||||
String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
|
||||
String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null;
|
||||
|
||||
if(StringUtils.isEmpty(specValue)){
|
||||
if(!SORTR.getValue().equals(transform)) {
|
||||
String message = "A specification is required for this transformation";
|
||||
results.add(new ValidationResult.Builder().valid(false)
|
||||
.explanation(message)
|
||||
.build());
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
Object specJson = JsonUtils.jsonToObject(specValue, DEFAULT_CHARSET);
|
||||
TransformationFactory.getTransform(transform, specJson);
|
||||
} catch (final Exception e) {
|
||||
getLogger().info("Processor is not valid - " + e.toString());
|
||||
String message = "Specification not valid for the selected transformation." ;
|
||||
results.add(new ValidationResult.Builder().valid(false)
|
||||
.explanation(message)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
|
||||
final FlowFile original = session.get();
|
||||
if (original == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
|
||||
final byte[] originalContent = new byte[(int) original.getSize()];
|
||||
session.read(original, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
StreamUtils.fillBuffer(in, originalContent, true);
|
||||
}
|
||||
});
|
||||
|
||||
final String jsonString;
|
||||
|
||||
try {
|
||||
final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent);
|
||||
final Object inputJson = JsonUtils.jsonToObject(bais);
|
||||
final Object transformedJson = transform.transform(inputJson);
|
||||
jsonString = JsonUtils.toJsonString(transformedJson);
|
||||
|
||||
} catch (RuntimeException re) {
|
||||
logger.error("Unable to transform {} due to {}", new Object[]{original, re});
|
||||
session.transfer(original, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
FlowFile transformed = session.write(original, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write(jsonString.getBytes(DEFAULT_CHARSET));
|
||||
}
|
||||
});
|
||||
|
||||
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
|
||||
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(),"application/json");
|
||||
session.transfer(transformed, REL_SUCCESS);
|
||||
session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
logger.info("Transformed {}", new Object[]{original});
|
||||
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setup(final ProcessContext context) {
|
||||
Object specJson = null;
|
||||
if(context.getProperty(JOLT_SPEC).isSet()){
|
||||
specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET);
|
||||
}
|
||||
transform = TransformationFactory.getTransform(context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
|
||||
}
|
||||
|
||||
private static class TransformationFactory {
|
||||
|
||||
static Transform getTransform(String transform, Object specJson) {
|
||||
if (transform.equals(DEFAULTR.getValue())) {
|
||||
return new Defaultr(specJson);
|
||||
} else if (transform.equals(SHIFTR.getValue())) {
|
||||
return new Shiftr(specJson);
|
||||
} else if (transform.equals(REMOVR.getValue())) {
|
||||
return new Removr(specJson);
|
||||
} else if (transform.equals(CARDINALITY.getValue())) {
|
||||
return new CardinalityTransform(specJson);
|
||||
} else if(transform.equals(SORTR.getValue())){
|
||||
return new Sortr();
|
||||
} else {
|
||||
return Chainr.fromSpec(specJson);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -79,6 +79,7 @@ org.apache.nifi.processors.standard.SplitJson
|
|||
org.apache.nifi.processors.standard.SplitText
|
||||
org.apache.nifi.processors.standard.SplitXml
|
||||
org.apache.nifi.processors.standard.TailFile
|
||||
org.apache.nifi.processors.standard.TransformJSON
|
||||
org.apache.nifi.processors.standard.TransformXml
|
||||
org.apache.nifi.processors.standard.UnpackContent
|
||||
org.apache.nifi.processors.standard.ValidateXml
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>TransformJSON</title>
|
||||
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<!-- Processor Documentation ================================================== -->
|
||||
<h2>Usage Information</h2>
|
||||
|
||||
<p>
|
||||
The JOLT utilities processing JSON are not not stream based therefore large JSON document
|
||||
transformation may consume large amounts of memory. Currently UTF-8 FlowFile content and Jolt specifications are supported.
|
||||
|
||||
<Strong>Note:</Strong> When configuring a processor if user selects of the Default transformation yet provides a
|
||||
Chain specification the system does not alert that the specification is invalid and and will produce failed flow files.
|
||||
This is a known issue identified within the JOLT library.
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,318 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.bazaarvoice.jolt.CardinalityTransform;
|
||||
import com.bazaarvoice.jolt.Chainr;
|
||||
import com.bazaarvoice.jolt.Defaultr;
|
||||
import com.bazaarvoice.jolt.Diffy;
|
||||
import com.bazaarvoice.jolt.JsonUtils;
|
||||
import com.bazaarvoice.jolt.Removr;
|
||||
import com.bazaarvoice.jolt.Shiftr;
|
||||
import com.bazaarvoice.jolt.Sortr;
|
||||
import com.bazaarvoice.jolt.Transform;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestTransformJSON {
|
||||
|
||||
final static Path JSON_INPUT = Paths.get("src/test/resources/TestTransformJson/input.json");
|
||||
final static Diffy DIFFY = new Diffy();
|
||||
|
||||
@Test
|
||||
public void testRelationshipsCreated() throws IOException{
|
||||
Processor processor= new TransformJSON();
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
Set<Relationship> relationships = processor.getRelationships();
|
||||
assertTrue(relationships.contains(TransformJSON.REL_FAILURE));
|
||||
assertTrue(relationships.contains(TransformJSON.REL_SUCCESS));
|
||||
assertTrue(relationships.size() == 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidJOLTSpec() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = "[{}]";
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncorrectJOLTSpec() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, chainrSpec);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecIsNotSet() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecIsEmpty() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, StringUtils.EMPTY);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecNotRequired() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SORTR);
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileContent() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertTransferCount(TransformJSON.REL_FAILURE,0);
|
||||
runner.assertTransferCount(TransformJSON.REL_SUCCESS,0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidFlowFileContentJson() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.enqueue("invalid json");
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_FAILURE);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetChainTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
|
||||
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.CHAINR.getValue(),JsonUtils.jsonToObject(chainrSpec));
|
||||
assertTrue(transform instanceof Chainr);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetRemoveTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
|
||||
final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/removrSpec.json")));
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.REMOVR.getValue(),JsonUtils.jsonToObject(removrSpec));
|
||||
assertTrue(transform instanceof Removr);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetShiftTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
|
||||
final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/shiftrSpec.json")));
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.SHIFTR.getValue(),JsonUtils.jsonToObject(shiftrSpec));
|
||||
assertTrue(transform instanceof Shiftr);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetDefaultTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
|
||||
final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/defaultrSpec.json")));
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.DEFAULTR.getValue(),JsonUtils.jsonToObject(defaultrSpec));
|
||||
assertTrue(transform instanceof Defaultr);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetCardinalityTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
|
||||
final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/cardrSpec.json")));
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.CARDINALITY.getValue(),JsonUtils.jsonToObject(cardrSpec));
|
||||
assertTrue(transform instanceof CardinalityTransform);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetSortrTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
Transform transform = (Transform) method.invoke(null,TransformJSON.SORTR.getValue(),null);
|
||||
assertTrue(transform instanceof Sortr);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetInvalidTransformWithNoSpec() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{
|
||||
List<Class> classes = Arrays.<Class>asList(TransformJSON.class.getDeclaredClasses());
|
||||
Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get();
|
||||
Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class);
|
||||
method.setAccessible(true);
|
||||
try {
|
||||
method.invoke(null, TransformJSON.CHAINR.getValue(), null);
|
||||
}catch (InvocationTargetException ite){
|
||||
assertTrue(ite.getTargetException().getLocalizedMessage().equals("JOLT Chainr expects a JSON array of objects - Malformed spec."));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithChainr() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
|
||||
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/chainrOutput.json")));
|
||||
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithShiftr() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/shiftrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
|
||||
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/shiftrOutput.json")));
|
||||
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithDefaultr() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/defaultrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.DEFAULTR);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/defaultrOutput.json")));
|
||||
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithRemovr() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/removrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.REMOVR);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/removrOutput.json")));
|
||||
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithCardinality() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/cardrSpec.json")));
|
||||
runner.setProperty(TransformJSON.JOLT_SPEC, spec);
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.CARDINALITY);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/cardrOutput.json")));
|
||||
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformInputWithSortr() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new TransformJSON());
|
||||
runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SORTR);
|
||||
runner.enqueue(JSON_INPUT);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS);
|
||||
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0);
|
||||
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
|
||||
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
|
||||
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
|
||||
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/sortrOutput.json")));
|
||||
String transformedJsonString = JsonUtils.toJsonString(transformedJson);
|
||||
String compareJsonString = JsonUtils.toJsonString(compareJson);
|
||||
assertTrue(compareJsonString.equals(transformedJsonString));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": 3
|
||||
},
|
||||
"quality": {
|
||||
"value": 3
|
||||
},
|
||||
"series": {
|
||||
"value": 5
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"rating" : {
|
||||
"series" : {
|
||||
"value" : "ONE"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"Range" : 5,
|
||||
"Rating" : 3,
|
||||
"SecondaryRatings" : {
|
||||
"quality" : {
|
||||
"Id" : "quality",
|
||||
"Range" : 5,
|
||||
"Value" : 3
|
||||
},
|
||||
"series" : {
|
||||
"Id" : "series",
|
||||
"Range" : 5,
|
||||
"Value" : [5,4]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
[
|
||||
{
|
||||
"operation": "shift",
|
||||
"spec": {
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": "Rating",
|
||||
"max": "RatingRange"
|
||||
},
|
||||
"*": {
|
||||
"max": "SecondaryRatings.&1.Range",
|
||||
"value": "SecondaryRatings.&1.Value",
|
||||
"$": "SecondaryRatings.&1.Id"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"operation": "default",
|
||||
"spec": {
|
||||
"Range": 5,
|
||||
"SecondaryRatings": {
|
||||
"*": {
|
||||
"Range": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"RatingRange" : 5,
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": 3,
|
||||
"MaxLabel": "High",
|
||||
"MinLabel": "Low",
|
||||
"DisplayType": "NORMAL"
|
||||
},
|
||||
"quality": {
|
||||
"value": 3,
|
||||
"MaxLabel": "High",
|
||||
"MinLabel": "Low",
|
||||
"DisplayType": "NORMAL"
|
||||
},
|
||||
"series": {
|
||||
"value": [5,4],
|
||||
"MaxLabel": "High",
|
||||
"MinLabel": "Low",
|
||||
"DisplayType": "NORMAL"
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"RatingRange" : 5,
|
||||
"rating": {
|
||||
"*": {
|
||||
"MaxLabel": "High",
|
||||
"MinLabel": "Low",
|
||||
"DisplayType": "NORMAL"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": 3
|
||||
},
|
||||
"series": {
|
||||
"value": [5,4]
|
||||
},
|
||||
"quality": {
|
||||
"value": 3
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": 3
|
||||
},
|
||||
"series":{
|
||||
"value":[5,4]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"rating": {
|
||||
"quality": ""
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
{
|
||||
"SecondaryRatings" : {
|
||||
"quality" : {
|
||||
"Value" : 3,
|
||||
"RatingRange" : 3
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": "SecondaryRatings.quality.RatingRange"
|
||||
},
|
||||
"quality": {
|
||||
"value": "SecondaryRatings.quality.Value"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"rating": {
|
||||
"primary": {
|
||||
"value": 3
|
||||
},
|
||||
"quality": {
|
||||
"value": 3
|
||||
},
|
||||
"series": {
|
||||
"value": [5,4]
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue