mirror of https://github.com/apache/nifi.git
Merge branch 'json-processors' of https://github.com/apiri/incubator-nifi into NIFI-360
This commit is contained in:
commit
f05cc9383c
28
nifi/LICENSE
28
nifi/LICENSE
|
@ -455,4 +455,32 @@ This product bundles 'json2.js' which is available in the 'public domain'.
|
|||
This product bundles 'reset.css' which is available in the 'public domain'.
|
||||
For details see http://meyerweb.com/eric/tools/css/reset/
|
||||
|
||||
This product bundles 'asm' which is available under an MIT style license.
|
||||
For details see http://asm.ow2.org/asmdex-license.html
|
||||
|
||||
Copyright (c) 2012 France Télécom
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions
|
||||
are met:
|
||||
1. Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
2. Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
3. Neither the name of the copyright holders nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -150,6 +150,10 @@
|
|||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service</artifactId>
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.*;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processors.standard.util.JsonUtils;
|
||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@Tags({"JSON", "evaluate", "JsonPath"})
|
||||
@CapabilityDescription("Evaluates one or more JsonPath expressions against the content of a FlowFile. "
|
||||
+ "The results of those expressions are assigned to FlowFile Attributes or are written to the content of the FlowFile itself, "
|
||||
+ "depending on configuration of the Processor. "
|
||||
+ "JsonPaths are entered by adding user-defined properties; the name of the property maps to the Attribute Name "
|
||||
+ "into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). "
|
||||
+ "The value of the property must be a valid JsonPath expression. "
|
||||
+ "If the JsonPath evaluates to a JSON array or JSON object and the Return Type is set to 'scalar' the FlowFile will be unmodified and will be routed to failure. "
|
||||
+ "A Return Type of JSON can return scalar values if the provided JsonPath evaluates to the specified value and will be routed as a match."
|
||||
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
|
||||
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
|
||||
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
|
||||
public class EvaluateJsonPath extends AbstractProcessor {
|
||||
|
||||
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
||||
public static final String DESTINATION_CONTENT = "flowfile-content";
|
||||
|
||||
public static final String RETURN_TYPE_AUTO = "auto-detect";
|
||||
public static final String RETURN_TYPE_JSON = "json";
|
||||
public static final String RETURN_TYPE_SCALAR = "scalar";
|
||||
|
||||
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
|
||||
.name("Destination")
|
||||
.description("Indicates whether the results of the JsonPath evaluation are written to the FlowFile content or a FlowFile attribute; if using attribute, must specify the Attribute Name property. If set to flowfile-content, only one JsonPath may be specified, and the property name is ignored.")
|
||||
.required(true)
|
||||
.allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE)
|
||||
.defaultValue(DESTINATION_CONTENT)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Return Type")
|
||||
.description("Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
|
||||
.required(true)
|
||||
.allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_JSON, RETURN_TYPE_SCALAR)
|
||||
.defaultValue(RETURN_TYPE_AUTO)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result").build();
|
||||
public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles are routed to this relationship when the JsonPath does not match the content of the FlowFile and the Destination is set to flowfile-content").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when the JsonPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid JSON").build();
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_MATCH);
|
||||
relationships.add(REL_NO_MATCH);
|
||||
relationships.add(REL_FAILURE);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(DESTINATION);
|
||||
properties.add(RETURN_TYPE);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
|
||||
final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
|
||||
|
||||
final String destination = context.getProperty(DESTINATION).getValue();
|
||||
if (DESTINATION_CONTENT.equals(destination)) {
|
||||
int jsonPathCount = 0;
|
||||
|
||||
for (final PropertyDescriptor desc : context.getProperties().keySet()) {
|
||||
if (desc.isDynamic()) {
|
||||
jsonPathCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (jsonPathCount != 1) {
|
||||
results.add(new ValidationResult.Builder().subject("JsonPaths").valid(false).explanation("Exactly one JsonPath must be set if using destination of " + DESTINATION_CONTENT).build());
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
|
||||
|
||||
List<FlowFile> flowFiles = processSession.get(50);
|
||||
if (flowFiles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
|
||||
/* Build the JsonPath expressions from attributes */
|
||||
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : processContext.getProperties().entrySet()) {
|
||||
if (!entry.getKey().isDynamic()) {
|
||||
continue;
|
||||
}
|
||||
final JsonPath jsonPath = JsonPath.compile(entry.getValue());
|
||||
attributeToJsonPathMap.put(entry.getKey().getName(), jsonPath);
|
||||
}
|
||||
|
||||
final String destination = processContext.getProperty(DESTINATION).getValue();
|
||||
String returnType = processContext.getProperty(RETURN_TYPE).getValue();
|
||||
if (returnType.equals(RETURN_TYPE_AUTO)) {
|
||||
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
|
||||
}
|
||||
|
||||
flowFileLoop:
|
||||
for (FlowFile flowFile : flowFiles) {
|
||||
|
||||
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile);
|
||||
|
||||
if (documentContext == null) {
|
||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
|
||||
processSession.transfer(flowFile, REL_FAILURE);
|
||||
continue flowFileLoop;
|
||||
}
|
||||
|
||||
final Map<String, String> jsonPathResults = new HashMap<>();
|
||||
|
||||
jsonPathEvalLoop:
|
||||
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
|
||||
|
||||
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
|
||||
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
|
||||
|
||||
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
||||
try {
|
||||
Object result = documentContext.read(jsonPathExp);
|
||||
if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
|
||||
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
|
||||
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
|
||||
processSession.transfer(flowFile, REL_FAILURE);
|
||||
continue flowFileLoop;
|
||||
}
|
||||
resultHolder.set(result);
|
||||
} catch (PathNotFoundException e) {
|
||||
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
|
||||
if (destination.equals(DESTINATION_ATTRIBUTE)) {
|
||||
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
|
||||
continue jsonPathEvalLoop;
|
||||
} else {
|
||||
processSession.transfer(flowFile, REL_NO_MATCH);
|
||||
continue flowFileLoop;
|
||||
}
|
||||
}
|
||||
|
||||
final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
|
||||
switch (destination) {
|
||||
case DESTINATION_ATTRIBUTE:
|
||||
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
||||
break;
|
||||
case DESTINATION_CONTENT:
|
||||
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(final OutputStream out) throws IOException {
|
||||
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
|
||||
processSession.transfer(flowFile, REL_MATCH);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.*;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processors.standard.util.JsonUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@Tags({"json", "split", "jsonpath"})
|
||||
@CapabilityDescription("Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. "
|
||||
+ "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' "
|
||||
+ "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
|
||||
+ "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
|
||||
public class SplitJson extends AbstractProcessor {
|
||||
|
||||
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
|
||||
.name("JsonPath Expression")
|
||||
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
|
||||
.required(true)
|
||||
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
|
||||
public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified path does not exist), it will be routed to this relationship").build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
private Set<Relationship> relationships;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(ARRAY_JSON_PATH_EXPRESSION);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_ORIGINAL);
|
||||
relationships.add(REL_SPLIT);
|
||||
relationships.add(REL_FAILURE);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
|
||||
final FlowFile original = processSession.get();
|
||||
if (original == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
|
||||
|
||||
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original);
|
||||
|
||||
if (documentContext == null) {
|
||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
||||
processSession.transfer(original, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
|
||||
final JsonPath jsonPath = JsonPath.compile(jsonPathExpression);
|
||||
|
||||
final List<FlowFile> segments = new ArrayList<>();
|
||||
|
||||
Object jsonPathResult;
|
||||
try {
|
||||
jsonPathResult = documentContext.read(jsonPath);
|
||||
} catch (PathNotFoundException e) {
|
||||
logger.warn("JsonPath {} could not be found for FlowFile {}", new Object[]{jsonPath.getPath(), original});
|
||||
processSession.transfer(original, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(jsonPathResult instanceof List)) {
|
||||
logger.error("The evaluated value {} of {} was not a JSON Array compatible type and cannot be split.",
|
||||
new Object[]{jsonPathResult, jsonPath.getPath()});
|
||||
processSession.transfer(original, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
List resultList = (List) jsonPathResult;
|
||||
|
||||
for (final Object resultSegment : resultList) {
|
||||
FlowFile split = processSession.create(original);
|
||||
split = processSession.write(split, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
String resultSegmentContent = JsonUtils.getResultRepresentation(resultSegment);
|
||||
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
});
|
||||
segments.add(split);
|
||||
}
|
||||
|
||||
processSession.transfer(segments, REL_SPLIT);
|
||||
processSession.transfer(original, REL_ORIGINAL);
|
||||
logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.InvalidPathException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||
import net.minidev.json.JSONValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
||||
import org.apache.nifi.util.BooleanHolder;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Provides utilities for interacting with JSON elements and JsonPath expressions and results
|
||||
*
|
||||
* @see <a href="http://json.org">http://json.org</a>
|
||||
* @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
|
||||
*/
|
||||
public class JsonUtils {
|
||||
|
||||
static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
|
||||
|
||||
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
String error = null;
|
||||
try {
|
||||
JsonPath compile = JsonPath.compile(input);
|
||||
} catch (InvalidPathException ipe) {
|
||||
error = ipe.toString();
|
||||
}
|
||||
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
|
||||
}
|
||||
};
|
||||
|
||||
public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||
|
||||
final BooleanHolder validJsonHolder = new BooleanHolder(false);
|
||||
processSession.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
validJsonHolder.set(JsonUtils.isValidJson(in));
|
||||
}
|
||||
});
|
||||
|
||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
||||
|
||||
if (validJsonHolder.get()) {
|
||||
processSession.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
||||
DocumentContext ctx = JsonPath.parse(in);
|
||||
contextHolder.set(ctx);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return contextHolder.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach
|
||||
* is preferred in determining whether or not a document is valid.
|
||||
* Performs a validation of the provided stream according to RFC 4627 as implemented by {@link net.minidev.json.parser.JSONParser#MODE_RFC4627}
|
||||
*
|
||||
* @param inputStream of content to be validated as JSON
|
||||
* @return true, if the content is valid within the bounds of the strictness specified; false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean isValidJson(InputStream inputStream) throws IOException {
|
||||
try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream)) {
|
||||
return JSONValue.isValidJsonStrict(inputStreamReader);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and
|
||||
* {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar.
|
||||
*
|
||||
* @param obj item to be inspected if it is a scalar or a JSON element
|
||||
* @return false, if the object is a supported type; true otherwise
|
||||
*/
|
||||
public static boolean isJsonScalar(Object obj) {
|
||||
// For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled as a JSON entity
|
||||
return !(obj instanceof Map || obj instanceof List);
|
||||
}
|
||||
|
||||
|
||||
public static String getResultRepresentation(Object jsonPathResult) {
|
||||
if (JsonUtils.isJsonScalar(jsonPathResult)) {
|
||||
return jsonPathResult.toString();
|
||||
}
|
||||
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||
}
|
||||
|
||||
}
|
|
@ -19,6 +19,7 @@ org.apache.nifi.processors.standard.ConvertCharacterSet
|
|||
org.apache.nifi.processors.standard.DetectDuplicate
|
||||
org.apache.nifi.processors.standard.DistributeLoad
|
||||
org.apache.nifi.processors.standard.EncryptContent
|
||||
org.apache.nifi.processors.standard.EvaluateJsonPath
|
||||
org.apache.nifi.processors.standard.EvaluateRegularExpression
|
||||
org.apache.nifi.processors.standard.EvaluateXPath
|
||||
org.apache.nifi.processors.standard.EvaluateXQuery
|
||||
|
@ -54,6 +55,7 @@ org.apache.nifi.processors.standard.ScanAttribute
|
|||
org.apache.nifi.processors.standard.ScanContent
|
||||
org.apache.nifi.processors.standard.SegmentContent
|
||||
org.apache.nifi.processors.standard.SplitContent
|
||||
org.apache.nifi.processors.standard.SplitJson
|
||||
org.apache.nifi.processors.standard.SplitText
|
||||
org.apache.nifi.processors.standard.SplitXml
|
||||
org.apache.nifi.processors.standard.TransformXml
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
public class TestEvaluateJsonPath {
|
||||
|
||||
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
|
||||
private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testInvalidJsonPath() {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
testRunner.setProperty("invalid.jsonPath", "$..");
|
||||
|
||||
Assert.fail("An improper JsonPath expression was not detected as being invalid.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidJsonDocument() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
|
||||
testRunner.enqueue(XML_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(EvaluateJsonPath.REL_FAILURE, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_FAILURE).get(0);
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testInvalidConfiguration_destinationContent_twoPaths() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty("JsonPath1", "$[0]._id");
|
||||
testRunner.setProperty("JsonPath2", "$[0].name");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Assert.fail("Processor incorrectly ran with an invalid configuration of multiple paths specified as attributes for a destination of content.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfiguration_destinationAttributes_twoPaths() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
testRunner.setProperty("JsonPath1", "$[0]._id");
|
||||
testRunner.setProperty("JsonPath2", "$[0].name");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationAttribute() throws Exception {
|
||||
String jsonPathAttrKey = "JsonPath";
|
||||
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
testRunner.setProperty(jsonPathAttrKey, "$[0]._id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathAttrKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationAttributes_twoPaths() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON);
|
||||
|
||||
String jsonPathIdAttrKey = "evaluatejson.id";
|
||||
String jsonPathNameAttrKey = "evaluatejson.name";
|
||||
|
||||
testRunner.setProperty(jsonPathIdAttrKey, "$[0]._id");
|
||||
testRunner.setProperty(jsonPathNameAttrKey, "$[0].name");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey));
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", "{\"first\":\"Shaffer\",\"last\":\"Pearson\"}", out.getAttribute(jsonPathNameAttrKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationAttributes_twoPaths_notFound() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
|
||||
String jsonPathIdAttrKey = "evaluatejson.id";
|
||||
String jsonPathNameAttrKey = "evaluatejson.name";
|
||||
|
||||
testRunner.setProperty(jsonPathIdAttrKey, "$[0]._id.nonexistent");
|
||||
testRunner.setProperty(jsonPathNameAttrKey, "$[0].name.nonexistent");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "", out.getAttribute(jsonPathIdAttrKey));
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", "", out.getAttribute(jsonPathNameAttrKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationAttributes_twoPaths_oneFound() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||
|
||||
String jsonPathIdAttrKey = "evaluatejson.id";
|
||||
String jsonPathNameAttrKey = "evaluatejson.name";
|
||||
|
||||
testRunner.setProperty(jsonPathIdAttrKey, "$[0]._id");
|
||||
testRunner.setProperty(jsonPathNameAttrKey, "$[0].name.nonexistent");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey));
|
||||
Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", StringUtils.EMPTY, out.getAttribute(jsonPathNameAttrKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationContent() throws Exception {
|
||||
String jsonPathAttrKey = "JsonPath";
|
||||
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty(jsonPathAttrKey, "$[0]._id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("54df94072d5dbf7dc6340cc5");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationContent_indefiniteResult() throws Exception {
|
||||
String jsonPathAttrKey = "friends.indefinite.id.list";
|
||||
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty(jsonPathAttrKey, "$[0].friends.[*].id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("[0,1,2]");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtractPath_destinationContent_indefiniteResult_operators() throws Exception {
|
||||
String jsonPathAttrKey = "friends.indefinite.id.list";
|
||||
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty(jsonPathAttrKey, "$[0].friends[?(@.id < 3)].id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("[0,1,2]");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouteUnmatched_destinationContent_noMatch() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty("jsonPath", "$[0].nonexistent.key");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_NO_MATCH;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRouteFailure_returnTypeScalar_resultArray() throws Exception {
|
||||
String jsonPathAttrKey = "friends.indefinite.id.list";
|
||||
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_SCALAR);
|
||||
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
|
||||
testRunner.setProperty(jsonPathAttrKey, "$[0].friends[?(@.id < 3)].id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = EvaluateJsonPath.REL_FAILURE;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
public class TestSplitJson {
|
||||
|
||||
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
|
||||
private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testInvalidJsonPath() {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$..");
|
||||
|
||||
Assert.fail("An improper JsonPath expression was not detected as being invalid.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidJsonDocument() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$");
|
||||
|
||||
testRunner.enqueue(XML_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(SplitJson.REL_FAILURE, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0);
|
||||
// Verify that the content was unchanged
|
||||
out.assertContentEquals(XML_SNIPPET);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit_nonArrayResult() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0]._id");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
Relationship expectedRel = SplitJson.REL_FAILURE;
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
|
||||
out.assertContentEquals(JSON_SNIPPET);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit_arrayResult_oneValue() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range[?(@ == 0)]");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1);
|
||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
|
||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit_arrayResult_multipleValues() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
int numSplitsExpected = 10;
|
||||
|
||||
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||
testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected);
|
||||
final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
|
||||
originalOut.assertContentEquals(JSON_SNIPPET);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit_arrayResult_nonScalarValues() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
|
||||
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
|
||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
|
||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplit_pathNotFound() throws Exception {
|
||||
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.nonexistent");
|
||||
|
||||
testRunner.enqueue(JSON_SNIPPET);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
|
||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0).assertContentEquals(JSON_SNIPPET);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,415 @@
|
|||
[
|
||||
{
|
||||
"_id": "54df94072d5dbf7dc6340cc5",
|
||||
"index": 0,
|
||||
"guid": "b9f636cb-b939-42a9-b067-70d286116271",
|
||||
"isActive": true,
|
||||
"balance": "$3,200.07",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 20,
|
||||
"eyeColor": "brown",
|
||||
"name": {
|
||||
"first": "Shaffer",
|
||||
"last": "Pearson"
|
||||
},
|
||||
"company": "DATAGEN",
|
||||
"email": "shaffer.pearson@datagen.co.uk",
|
||||
"phone": "+1 (972) 588-2272",
|
||||
"address": "662 Rewe Street, Starks, California, 9066",
|
||||
"about": "Aliquip exercitation ad duis irure consectetur magna aliquip amet. Exercitation labore ex laboris non dolor eu. In magna amet non nulla sit laboris do aliqua aliquip. Est elit ipsum ad ea in Lorem mollit Lorem laborum. Ad labore minim aliqua dolore reprehenderit commodo nulla fugiat eiusmod nostrud cillum est. Deserunt minim in non aliqua non.\r\n",
|
||||
"registered": "Wednesday, January 7, 2015 5:51 PM",
|
||||
"latitude": -50.359159,
|
||||
"longitude": -94.01781,
|
||||
"tags": [
|
||||
"ea",
|
||||
"enim",
|
||||
"commodo",
|
||||
"magna",
|
||||
"sunt",
|
||||
"dolore",
|
||||
"aute"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Holloway Kim"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Clark Medina"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Rosemarie Salazar"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Shaffer! You have 9 unread messages.",
|
||||
"favoriteFruit": "apple"
|
||||
},
|
||||
{
|
||||
"_id": "54df94073ab1785758096418",
|
||||
"index": 1,
|
||||
"guid": "fda79e72-6489-41f5-bbd5-a5e7d2996dda",
|
||||
"isActive": false,
|
||||
"balance": "$1,416.15",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 38,
|
||||
"eyeColor": "blue",
|
||||
"name": {
|
||||
"first": "Frazier",
|
||||
"last": "Ramsey"
|
||||
},
|
||||
"company": "STREZZO",
|
||||
"email": "frazier.ramsey@strezzo.biz",
|
||||
"phone": "+1 (909) 448-2724",
|
||||
"address": "624 Cedar Street, Iola, North Carolina, 2827",
|
||||
"about": "Sit sunt eiusmod irure ipsum Lorem irure aliquip cupidatat in proident dolore sunt adipisicing. Aute ipsum reprehenderit aute aliquip ad id pariatur dolor dolore et exercitation. Pariatur est adipisicing eu aliqua ea sint qui. Fugiat officia voluptate anim dolore cupidatat amet. Amet cillum dolor magna elit fugiat.\r\n",
|
||||
"registered": "Sunday, January 5, 2014 1:18 PM",
|
||||
"latitude": -14.729254,
|
||||
"longitude": 126.396861,
|
||||
"tags": [
|
||||
"non",
|
||||
"laboris",
|
||||
"nulla",
|
||||
"commodo",
|
||||
"nostrud",
|
||||
"qui",
|
||||
"ea"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Valenzuela Stone"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "King Munoz"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Kari Woodard"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Frazier! You have 7 unread messages.",
|
||||
"favoriteFruit": "strawberry"
|
||||
},
|
||||
{
|
||||
"_id": "54df9407369a4d3f1b4aed39",
|
||||
"index": 2,
|
||||
"guid": "b6a68edb-4ddd-487b-b104-f02bec805e4c",
|
||||
"isActive": true,
|
||||
"balance": "$2,487.31",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 27,
|
||||
"eyeColor": "green",
|
||||
"name": {
|
||||
"first": "Cindy",
|
||||
"last": "Shepherd"
|
||||
},
|
||||
"company": "EMTRAK",
|
||||
"email": "cindy.shepherd@emtrak.org",
|
||||
"phone": "+1 (867) 466-3223",
|
||||
"address": "659 Colin Place, Vaughn, Washington, 1106",
|
||||
"about": "Nulla sunt aliquip eiusmod occaecat duis officia eiusmod aliqua cillum ut. Irure eu est nulla dolor laborum eiusmod Lorem dolore culpa aliquip veniam duis. Sint cupidatat laboris commodo sunt consequat ullamco culpa ad labore. Velit do voluptate quis occaecat ex ipsum cupidatat occaecat dolor officia laborum labore.\r\n",
|
||||
"registered": "Thursday, June 26, 2014 9:56 PM",
|
||||
"latitude": 85.829527,
|
||||
"longitude": -79.452723,
|
||||
"tags": [
|
||||
"cillum",
|
||||
"do",
|
||||
"veniam",
|
||||
"dolore",
|
||||
"voluptate",
|
||||
"et",
|
||||
"adipisicing"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Decker Carver"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Donaldson Burgess"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Santana Heath"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Cindy! You have 8 unread messages.",
|
||||
"favoriteFruit": "strawberry"
|
||||
},
|
||||
{
|
||||
"_id": "54df94076f342042d027ca67",
|
||||
"index": 3,
|
||||
"guid": "ac591519-1642-4092-9646-17b4b7a9e38b",
|
||||
"isActive": false,
|
||||
"balance": "$3,480.12",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 37,
|
||||
"eyeColor": "green",
|
||||
"name": {
|
||||
"first": "Colon",
|
||||
"last": "Gamble"
|
||||
},
|
||||
"company": "RONELON",
|
||||
"email": "colon.gamble@ronelon.net",
|
||||
"phone": "+1 (988) 431-2933",
|
||||
"address": "472 Ryerson Street, Gwynn, Wyoming, 4200",
|
||||
"about": "Ad duis nostrud laboris id aute reprehenderit veniam aute aute laborum exercitation laborum. In minim quis in sunt minim labore deserunt id dolor ea sit. Ipsum tempor Lorem aliqua ad sit quis duis exercitation quis. Dolore voluptate aute ut est non quis do aute exercitation consectetur reprehenderit proident quis.\r\n",
|
||||
"registered": "Tuesday, July 29, 2014 1:38 PM",
|
||||
"latitude": -9.922105,
|
||||
"longitude": -170.581901,
|
||||
"tags": [
|
||||
"fugiat",
|
||||
"incididunt",
|
||||
"proident",
|
||||
"laboris",
|
||||
"id",
|
||||
"ullamco",
|
||||
"non"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Shawn Collins"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Holland West"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Daniel Fischer"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Colon! You have 7 unread messages.",
|
||||
"favoriteFruit": "strawberry"
|
||||
},
|
||||
{
|
||||
"_id": "54df94075774d288fc86a912",
|
||||
"index": 4,
|
||||
"guid": "daec0340-7900-4a65-92fc-22e727577660",
|
||||
"isActive": true,
|
||||
"balance": "$3,042.74",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 36,
|
||||
"eyeColor": "brown",
|
||||
"name": {
|
||||
"first": "Carter",
|
||||
"last": "Russo"
|
||||
},
|
||||
"company": "NORALEX",
|
||||
"email": "carter.russo@noralex.biz",
|
||||
"phone": "+1 (819) 543-3605",
|
||||
"address": "147 Everit Street, Saticoy, Missouri, 5963",
|
||||
"about": "Ea irure non pariatur ipsum. Magna eu enim anim Lorem quis sint cillum. Voluptate proident commodo dolor aute consectetur reprehenderit dolor nostrud ipsum cillum magna dolor. Reprehenderit sit consequat pariatur enim do occaecat exercitation reprehenderit.\r\n",
|
||||
"registered": "Saturday, January 25, 2014 10:12 PM",
|
||||
"latitude": -65.101248,
|
||||
"longitude": 19.867506,
|
||||
"tags": [
|
||||
"dolore",
|
||||
"et",
|
||||
"ex",
|
||||
"eu",
|
||||
"nostrud",
|
||||
"ex",
|
||||
"ad"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Felicia Hull"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Jerri Mays"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Jo Justice"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Carter! You have 7 unread messages.",
|
||||
"favoriteFruit": "apple"
|
||||
},
|
||||
{
|
||||
"_id": "54df940741be468e58e87dd3",
|
||||
"index": 5,
|
||||
"guid": "16a037a3-fe30-4c51-8d09-f24ad54f4719",
|
||||
"isActive": true,
|
||||
"balance": "$1,979.92",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 20,
|
||||
"eyeColor": "blue",
|
||||
"name": {
|
||||
"first": "Claudia",
|
||||
"last": "Houston"
|
||||
},
|
||||
"company": "FISHLAND",
|
||||
"email": "claudia.houston@fishland.com",
|
||||
"phone": "+1 (860) 498-3802",
|
||||
"address": "821 Remsen Avenue, Ada, Vermont, 3101",
|
||||
"about": "Lorem eu deserunt et non id consectetur laborum voluptate id magna labore. Dolore enim voluptate mollit culpa cupidatat officia do aute voluptate Lorem commodo. Nisi nostrud amet in labore ullamco nisi magna adipisicing voluptate aliquip qui consequat enim. Pariatur adipisicing nostrud ut deserunt ad excepteur. Lorem do voluptate adipisicing et laborum commodo nulla excepteur laborum quis tempor proident velit.\r\n",
|
||||
"registered": "Thursday, August 7, 2014 7:48 AM",
|
||||
"latitude": 34.6075,
|
||||
"longitude": -2.643176,
|
||||
"tags": [
|
||||
"enim",
|
||||
"eu",
|
||||
"sint",
|
||||
"qui",
|
||||
"elit",
|
||||
"laboris",
|
||||
"commodo"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Boyd Morrison"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Wendi Sandoval"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Mindy Bush"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Claudia! You have 8 unread messages.",
|
||||
"favoriteFruit": "apple"
|
||||
},
|
||||
{
|
||||
"_id": "54df9407fbfc2103751de2e7",
|
||||
"index": 6,
|
||||
"guid": "60241980-5362-41dd-b6e5-e55f174904cf",
|
||||
"isActive": true,
|
||||
"balance": "$3,106.83",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 40,
|
||||
"eyeColor": "green",
|
||||
"name": {
|
||||
"first": "Beulah",
|
||||
"last": "Myers"
|
||||
},
|
||||
"company": "UNI",
|
||||
"email": "beulah.myers@uni.tv",
|
||||
"phone": "+1 (969) 407-3571",
|
||||
"address": "661 Matthews Court, Osage, Delaware, 1167",
|
||||
"about": "Officia ipsum reprehenderit in nostrud Lorem labore consectetur nulla quis officia ullamco. Eiusmod ipsum deserunt consectetur cillum et duis do esse veniam occaecat Lorem dolor consequat. Lorem esse cupidatat aute et ut.\r\n",
|
||||
"registered": "Sunday, January 25, 2015 8:22 PM",
|
||||
"latitude": 72.620891,
|
||||
"longitude": 155.859974,
|
||||
"tags": [
|
||||
"minim",
|
||||
"fugiat",
|
||||
"irure",
|
||||
"culpa",
|
||||
"exercitation",
|
||||
"labore",
|
||||
"commodo"
|
||||
],
|
||||
"range": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9
|
||||
],
|
||||
"friends": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "Corina Francis"
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Vera Carson"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Blevins Camacho"
|
||||
}
|
||||
],
|
||||
"greeting": "Hello, Beulah! You have 8 unread messages.",
|
||||
"favoriteFruit": "apple"
|
||||
}
|
||||
]
|
|
@ -783,6 +783,11 @@
|
|||
<artifactId>nifi-write-ahead-log</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
<version>1.2.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue