NIFI-1893 Add processor for validating JSON

This closes #1037.
This commit is contained in:
Bartosz Wozniak 2016-09-19 13:37:54 +02:00 committed by Pierre Villard
parent 77f8503e98
commit f11682202b
8 changed files with 492 additions and 0 deletions

View File

@ -249,6 +249,11 @@ language governing permissions and limitations under the License. -->
<artifactId>super-csv</artifactId> <artifactId>super-csv</artifactId>
<version>2.4.0</version> <version>2.4.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.everit.json</groupId>
<artifactId>org.everit.json.schema</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -274,6 +279,9 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.csv</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.csv</exclude>
<exclude>src/test/resources/TestJson/json-sample.json</exclude> <exclude>src/test/resources/TestJson/json-sample.json</exclude>
<exclude>src/test/resources/TestJson/json-sample-schema.json</exclude>
<exclude>src/test/resources/TestJson/json-object-sample.json</exclude>
<exclude>src/test/resources/TestJson/json-object-sample-schema.json</exclude>
<exclude>src/test/resources/TestJson/control-characters.json</exclude> <exclude>src/test/resources/TestJson/control-characters.json</exclude>
<exclude>src/test/resources/TestMergeContent/demarcate</exclude> <exclude>src/test/resources/TestMergeContent/demarcate</exclude>
<exclude>src/test/resources/TestMergeContent/foot</exclude> <exclude>src/test/resources/TestMergeContent/foot</exclude>

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"json", "schema", "validation"})
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified JSON Schema file")
public class ValidateJson extends AbstractProcessor {
public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
.name("validate-json-schema-file")
.displayName("Schema File")
.description("The path to the Schema file that is to be used for validation. Only one of Schema File or Schema Body may be used")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
public static final PropertyDescriptor SCHEMA_BODY = new PropertyDescriptor.Builder()
.name("validate-json-schema-body")
.displayName("Schema Body")
.required(false)
.description("Json Schema Body that is to be used for validation. Only one of Schema File or Schema Body may be used")
.expressionLanguageSupported(false)
.addValidator(Validator.VALID)
.build();
public static final Relationship REL_VALID = new Relationship.Builder()
.name("valid")
.description("FlowFiles that are successfully validated against the schema are routed to this relationship")
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
/**
* Custom validation for ensuring exactly one of Script File or Script Body is populated
*
* @param validationContext provides a mechanism for obtaining externally
* managed values, such as property values and supplies convenience methods
* for operating on those values
* @return A collection of validation results
*/
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Set<ValidationResult> results = new HashSet<>();
// Verify that exactly one of "script file" or "script body" is set
Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
if (StringUtils.isEmpty(propertyMap.get(SCHEMA_FILE)) == StringUtils.isEmpty(propertyMap.get(SCHEMA_BODY))) {
results.add(new ValidationResult.Builder().valid(false).explanation(
"Exactly one of Schema File or Schema Body must be set").build());
}
return results;
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA_FILE);
properties.add(SCHEMA_BODY);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_VALID);
relationships.add(REL_INVALID);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnScheduled
public void parseSchema(final ProcessContext context) throws IOException {
JSONObject jsonObjectSchema;
if(context.getProperty(SCHEMA_FILE).isSet()){
try(FileInputStream inputStream = new FileInputStream(new File(context.getProperty(SCHEMA_FILE).getValue()))) {
JSONTokener jsonTokener = new JSONTokener(inputStream);
jsonObjectSchema = new JSONObject(jsonTokener);
}
} else {
String rawSchema = context.getProperty(SCHEMA_BODY).getValue();
jsonObjectSchema = new JSONObject(rawSchema);
}
Schema schema = SchemaLoader.load(jsonObjectSchema);
this.schemaRef.set(schema);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Schema schema = schemaRef.get();
final ComponentLog logger = getLogger();
final AtomicBoolean valid = new AtomicBoolean(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) {
try {
String str = IOUtils.toString(in, StandardCharsets.UTF_8);
if (str.startsWith("[")) {
schema.validate(new JSONArray(str)); // throws a ValidationException if this object is invalid
} else {
schema.validate(new JSONObject(str)); // throws a ValidationException if this object is invalid
}
} catch (final IllegalArgumentException | ValidationException | IOException e) {
valid.set(false);
logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e});
}
}
});
if (valid.get()) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
logger.debug("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.transfer(flowFile, REL_INVALID);
}
}
}

View File

@ -88,6 +88,7 @@ org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.ValidateCsv org.apache.nifi.processors.standard.ValidateCsv
org.apache.nifi.processors.standard.ValidateJson
org.apache.nifi.processors.standard.ExecuteSQL org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.ListFTP org.apache.nifi.processors.standard.ListFTP

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ValidateJson</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Usage Information</h2>
<p>
The Validate JSON processor is based on the json-schema library.
The corresponding java documentation can be found
<a href="https://github.com/everit-org/json-schema" target="_blank">here</a>.
</p>
</body>
</html>

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.xml.sax.SAXException;
public class TestValidateJson {
@Test
public void testValidJsonArraySchemaFile() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
runner.setProperty(ValidateJson.SCHEMA_FILE, "src/test/resources/TestJson/json-sample-schema.json");
runner.enqueue(Paths.get("src/test/resources/TestJson/json-sample.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_VALID, 1);
}
@Test
public void testValidJsonObjectSchemaFile() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
runner.setProperty(ValidateJson.SCHEMA_FILE, "src/test/resources/TestJson/json-object-sample-schema.json");
runner.enqueue(Paths.get("src/test/resources/TestJson/json-object-sample.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_VALID, 1);
}
@Test
public void testValidJsonArraySchemaBody() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
String schemaBody = IOUtils.toString(getClass().getClassLoader().getResourceAsStream("TestJson/json-sample-schema.json"), "UTF-8");
runner.setProperty(ValidateJson.SCHEMA_BODY, schemaBody);
runner.enqueue(Paths.get("src/test/resources/TestJson/json-sample.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_VALID, 1);
}
@Test
public void testValidJsonObjectSchemaBody() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
String schemaBody = IOUtils.toString(getClass().getClassLoader().getResourceAsStream("TestJson/json-object-sample-schema.json"), "UTF-8");
runner.setProperty(ValidateJson.SCHEMA_BODY, schemaBody);
runner.enqueue(Paths.get("src/test/resources/TestJson/json-object-sample.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_VALID, 1);
}
@Test
public void testInvalidJsonArraySchemaBody() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
String schemaBody = "{\"type\": \"object\",\"required\": [\"missingField\"]}"; //invalid schema for JSONArray
runner.setProperty(ValidateJson.SCHEMA_BODY, schemaBody);
runner.enqueue(Paths.get("src/test/resources/TestJson/json-sample.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_INVALID, 1);
}
@Test
public void testInvalidJsonObjectSchemaBody() throws IOException, SAXException {
final TestRunner runner = TestRunners.newTestRunner(new ValidateJson());
String schemaBody = "{\"type\": \"object\",\"required\": [\"missingField\"]}"; //schema requires missingField
runner.setProperty(ValidateJson.SCHEMA_BODY, schemaBody);
runner.enqueue(Paths.get("src/test/resources/TestJson/json-object-sample.json")); //json without missingField
runner.run();
runner.assertAllFlowFilesTransferred(ValidateJson.REL_INVALID, 1);
}
}

View File

@ -0,0 +1,41 @@
{
"type": "object",
"required": ["_id", "index", "guid", "isActive", "balance", "picture", "age", "eyeColor", "name", "company", "email", "phone" ,"address", "about", "registered", "latitude", "longitude", "tags", "range", "friends", "greeting", "favoriteFruit"],
"properties": {
"_id": {"type": "string"},
"index": {"type": "integer"},
"guid": {"type": "string"},
"isActive": { "type": "boolean"},
"balance": {"type": "string"},
"picture": {"type": "string"},
"age": {"type": "integer"},
"eyeColor": {"type": "string"},
"name": {
"type": "object",
"properties": {
"first": {"type":"string"},
"last": {"type":"string"}
}
},
"company": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"address": {"type": "string"},
"about": {"type": "string"},
"registered": {"type": "string"},
"latitude": {"type": "number"},
"longitude": {"type": "number"},
"tags": {"type": "array"},
"range": {"type": "array"},
"friends": {
"type": "array",
"required": ["id", "me"],
"properties": {
"id": {"type":"integer"},
"name": {"type":"string"}
}
},
"greeting": {"type": "string"},
"favoriteFruit": {"type": "string"}
}
}

View File

@ -0,0 +1,59 @@
{
"_id": "54df94072d5dbf7dc6340cc5",
"index": 0,
"guid": "b9f636cb-b939-42a9-b067-70d286116271",
"isActive": true,
"balance": "$3,200.07",
"picture": "http://placehold.it/32x32",
"age": 20,
"eyeColor": "brown",
"name": {
"first": "Shaffer",
"last": "Pearson"
},
"company": "DATAGEN",
"email": "shaffer.pearson@datagen.co.uk",
"phone": "+1 (972) 588-2272",
"address": "662 Rewe Street, Starks, California, 9066",
"about": "Aliquip exercitation ad duis irure consectetur magna aliquip amet. Exercitation labore ex laboris non dolor eu. In magna amet non nulla sit laboris do aliqua aliquip. Est elit ipsum ad ea in Lorem mollit Lorem laborum. Ad labore minim aliqua dolore reprehenderit commodo nulla fugiat eiusmod nostrud cillum est. Deserunt minim in non aliqua non.\r\n",
"registered": "Wednesday, January 7, 2015 5:51 PM",
"latitude": -50.359159,
"longitude": -94.01781,
"tags": [
"ea",
"enim",
"commodo",
"magna",
"sunt",
"dolore",
"aute"
],
"range": [
0,
1,
2,
3,
4,
5,
6,
7,
8,
9
],
"friends": [
{
"id": 0,
"name": "Holloway Kim"
},
{
"id": 1,
"name": "Clark Medina"
},
{
"id": 2,
"name": "Rosemarie Salazar"
}
],
"greeting": "Hello, Shaffer! You have 9 unread messages.",
"favoriteFruit": "apple"
}

View File

@ -0,0 +1,42 @@
{
"type": "array",
"minItems": 1,
"items": {
"required": ["_id", "index", "guid", "isActive", "balance", "picture", "age", "eyeColor", "name", "company", "email", "phone" ,"address", "about", "registered", "latitude", "longitude", "tags", "range", "friends", "greeting", "favoriteFruit"],
"_id": {"type": "string"},
"index": {"type": "integer"},
"guid": {"type": "string"},
"isActive": { "type": "boolean"},
"balance": {"type": "string"},
"picture": {"type": "string"},
"age": {"type": "integer"},
"eyeColor": {"type": "string"},
"name": {
"type": "object",
"properties": {
"first": {"type":"string"},
"last": {"type":"string"}
}
},
"company": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"address": {"type": "string"},
"about": {"type": "string"},
"registered": {"type": "string"},
"latitude": {"type": "number"},
"longitude": {"type": "number"},
"tags": {"type": "array"},
"range": {"type": "array"},
"friends": {
"type": "array",
"required": ["id", "me"],
"properties": {
"id": {"type":"integer"},
"name": {"type":"string"}
}
},
"greeting": {"type": "string"},
"favoriteFruit": {"type": "string"}
}
}