NIFI-11627 Added JsonSchemaRegistry for ValidateJson

- Added nifi-json-schema-api to nifi-commons
- Added StandardJsonSchemaRegistry implementation of JsonSchemaRegistry
- Added strategy configuration properties to ValidateJson

This closes #8005

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2023-11-10 17:36:42 +00:00 committed by exceptionfactory
parent 69df3f0f66
commit 6bca79cb37
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
15 changed files with 815 additions and 68 deletions

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-json-schema-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json.schema;
import java.util.Objects;
public class JsonSchema {
private final SchemaVersion schemaVersion;
private final String schemaText;
public JsonSchema(SchemaVersion schemaVersion, String schemaText) {
Objects.requireNonNull(schemaVersion, "Schema version cannot be null");
Objects.requireNonNull(schemaText, "The text of the schema cannot be null");
this.schemaVersion = schemaVersion;
this.schemaText = schemaText;
}
public SchemaVersion getSchemaVersion() {
return schemaVersion;
}
public String getSchemaText() {
return schemaText;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json.schema;
import org.apache.nifi.components.DescribedValue;
public enum SchemaVersion implements DescribedValue {
DRAFT_4("Draft Version 4", "Draft 4", "https://json-schema.org/draft-04/schema"),
DRAFT_6("Draft Version 6", "Draft 6", "https://json-schema.org/draft-06/schema"),
DRAFT_7("Draft Version 7", "Draft 7", "https://json-schema.org/draft-07/schema"),
DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", "https://json-schema.org/draft/2019-09/schema"),
DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", "https://json-schema.org/draft/2020-12/schema");
private final String description;
private final String displayName;
private final String uri;
SchemaVersion(String description, String displayName, String uri) {
this.description = description;
this.displayName = displayName;
this.uri = uri;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
public String getUri() {
return uri;
}
}

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-json-schema-shared</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.json.schema.SchemaVersion;
public interface JsonSchemaRegistryComponent {
PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
.Builder()
.name("JSON Schema Version")
.displayName("JSON Schema Version")
.description("The JSON schema specification")
.required(true)
.allowableValues(SchemaVersion.class)
.defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
.build();
}

View File

@ -30,6 +30,8 @@
<module>nifi-flowfile-packager</module>
<module>nifi-flow-encryptor</module>
<module>nifi-hl7-query-language</module>
<module>nifi-json-schema-api</module>
<module>nifi-json-schema-shared</module>
<module>nifi-json-utils</module>
<module>nifi-jetty-configuration</module>
<module>nifi-kubernetes-client</module>

View File

@ -44,6 +44,11 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@ -52,5 +57,20 @@ language governing permissions and limitations under the License. -->
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
<version>1.0.87</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schemaregistry.services;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.json.schema.JsonSchema;
import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.json.schema.SchemaVersion;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Tags({"schema", "registry", "json"})
@CapabilityDescription("Provides a service for registering and accessing JSON schemas. One can register a schema "
+ "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
+ "representation of the actual schema following the syntax and semantics of the JSON Schema format. "
+ "Empty schemas and schemas only consisting of whitespace are not acceptable schemas."
+ "The registry is heterogeneous registry as it can store schemas of different schema draft versions. "
+ "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version "
+ "which is currently is set, is what the schema is saved as.")
@DynamicProperty(name = "Schema Name", value = "Schema Content",
description = "Adds a named schema using the JSON string representation of a JSON schema",
expressionLanguageScope = ExpressionLanguageScope.NONE)
public class StandardJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION);
private final ConcurrentMap<String, JsonSchema> jsonSchemas;
private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories;
private volatile SchemaVersion schemaVersion;
public StandardJsonSchemaRegistry() {
jsonSchemas = new ConcurrentHashMap<>();
schemaFactories = Arrays.stream(SchemaVersion.values())
.collect(Collectors.toConcurrentMap(Function.identity(),
schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) {
schemaVersion = SchemaVersion.valueOf(newValue);
} else if(descriptor.isDynamic() && isBlank(newValue)) {
jsonSchemas.remove(descriptor.getName());
} else if (descriptor.isDynamic() && isNotBlank(newValue)) {
try {
final String schemaName = descriptor.getName();
final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion);
jsonSchemaFactory.getSchema(newValue);
jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue));
} catch (final Exception e) {
getLogger().debug("Exception thrown when changing value of schema name '{}' from '{}' to '{}'",
descriptor.getName(), oldValue, newValue, e);
}
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Set<ValidationResult> results = new HashSet<>();
final boolean noSchemasConfigured = validationContext.getProperties().keySet().stream()
.noneMatch(PropertyDescriptor::isDynamic);
if (noSchemasConfigured) {
results.add(new ValidationResult.Builder()
.subject("Supported Dynamic Property Descriptor")
.valid(false)
.explanation("There must be at least one JSON schema specified")
.build());
} else {
// Iterate over dynamic properties, validating only newly added schemas, and adding results
schemaVersion = SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue());
validationContext.getProperties().entrySet().stream()
.filter(entry -> entry.getKey().isDynamic() && !jsonSchemas.containsKey(entry.getKey().getName()))
.forEach(entry -> {
String subject = entry.getKey().getName();
String input = entry.getValue();
if (isNotBlank(input)) {
try {
final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion);
jsonSchemaFactory.getSchema(input);
} catch (Exception e) {
results.add(new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Not a valid JSON Schema: " + e.getMessage())
.build());
}
}
});
}
return results;
}
@Override
public JsonSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
JsonSchema jsonSchema = jsonSchemas.get(schemaName);
if (jsonSchema == null) {
throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
}
return jsonSchema;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
}
}

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
org.apache.nifi.schemaregistry.services.StandardJsonSchemaRegistry

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schemaregistry.services;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.nifi.util.TestRunners;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestStandardJsonSchemaRegistry {
private static final String SCHEMA_NAME = "fooSchema";
private static final PropertyDescriptor SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR = new PropertyDescriptor.Builder()
.name(SCHEMA_NAME)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@Mock
private ValidationContext validationContext;
@Mock
private PropertyValue propertyValue;
private Map<PropertyDescriptor, String> properties;
private StandardJsonSchemaRegistry delegate;
@BeforeEach
void setUp() throws InitializationException {
properties = new HashMap<>();
delegate = new StandardJsonSchemaRegistry();
TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("jsonSchemaRegistry", delegate);
}
@Test
void testCustomValidateWithoutAnySchemaSpecified() {
when(validationContext.getProperties()).thenReturn(properties);
final Collection<ValidationResult> results = delegate.customValidate(validationContext);
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertFalse(result.isValid());
assertTrue(result.getExplanation().contains("at least one JSON schema specified"));
}
@ParameterizedTest(name = "{3}")
@MethodSource("dynamicProperties")
void testCustomValidateWithSchemaRegistrationFromDynamicProperties(PropertyDescriptor propertyDescriptor, String schema, int numValidationErrors) {
when(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION)).thenReturn(propertyValue);
when(propertyValue.getValue()).thenReturn(JsonSchemaRegistryComponent.SCHEMA_VERSION.getDefaultValue());
when(validationContext.getProperties()).thenReturn(properties);
properties.put(propertyDescriptor, schema);
delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDisplayName()));
assertEquals(numValidationErrors, delegate.customValidate(validationContext).size());
}
@ParameterizedTest(name = "{3}")
@MethodSource("dynamicProperties")
void testSchemaRetrieval(PropertyDescriptor propertyDescriptor, String schema, int numValidationErrors) throws SchemaNotFoundException {
delegate.onPropertyModified(propertyDescriptor, null, schema);
boolean validSchema = numValidationErrors == 0;
if(validSchema) {
assertDoesNotThrow(() -> delegate.retrieveSchema(SCHEMA_NAME));
assertNotNull(delegate.retrieveSchema(SCHEMA_NAME));
} else {
assertThrows(SchemaNotFoundException.class, () -> delegate.retrieveSchema(SCHEMA_NAME));
}
}
private static Stream<Arguments> dynamicProperties() {
return Stream.of(
Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "{}", 0, "empty object schema"),
Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "[]", 0, "empty array schema"),
Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "not a schema", 1, "non whitespace")
);
}
}

View File

@ -499,6 +499,16 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -19,9 +19,8 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion.VersionFlag;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -39,23 +38,35 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.json.schema.JsonSchema;
import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
import org.apache.nifi.json.schema.SchemaVersion;
import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@SideEffectFree
@SupportsBatching
@ -79,23 +90,22 @@ import java.util.Set;
}
)
public class ValidateJson extends AbstractProcessor {
public enum SchemaVersion implements DescribedValue {
DRAFT_4("Draft Version 4", "Draft 4", VersionFlag.V4),
DRAFT_6("Draft Version 6", "Draft 6", VersionFlag.V6),
DRAFT_7("Draft Version 7", "Draft 7", VersionFlag.V7),
DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", VersionFlag.V201909),
DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", VersionFlag.V202012);
public enum JsonSchemaStrategy implements DescribedValue {
SCHEMA_NAME_PROPERTY(SCHEMA_NAME_PROPERTY_NAME + " Property",
"The name of the Schema to use is specified by the '" + SCHEMA_NAME_PROPERTY_NAME +
"' Property. The value of this property is used to lookup the Schema in the configured JSON Schema Registry Service."),
SCHEMA_CONTENT_PROPERTY(SCHEMA_CONTENT_PROPERTY_NAME + " Property",
"A URL or file path to the JSON schema or the actual JSON schema is specified by the '" + SCHEMA_CONTENT_PROPERTY_NAME + "' Property. " +
"No matter how the JSON schema is specified, it must be a valid JSON schema");
private final String description;
private final String displayName;
private final VersionFlag versionFlag;
SchemaVersion(String description, String displayName, VersionFlag versionFlag) {
this.description = description;
JsonSchemaStrategy(String displayName, String description) {
this.displayName = displayName;
this.versionFlag = versionFlag;
this.description = description;
}
private final String displayName;
private final String description;
@Override
public String getValue() {
return name();
@ -110,37 +120,62 @@ public class ValidateJson extends AbstractProcessor {
public String getDescription() {
return description;
}
public VersionFlag getVersionFlag() {
return versionFlag;
}
}
public static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
protected static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
public static final PropertyDescriptor SCHEMA_CONTENT = new PropertyDescriptor
.Builder().name("JSON Schema")
.displayName("JSON Schema")
.description("The content of a JSON Schema")
public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
.name("Schema Access Strategy")
.displayName("Schema Access Strategy")
.description("Specifies how to obtain the schema that is to be used for interpreting the data.")
.allowableValues(JsonSchemaStrategy.class)
.defaultValue(JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue())
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name(SCHEMA_NAME_PROPERTY_NAME)
.displayName(SCHEMA_NAME_PROPERTY_NAME)
.description("Specifies the name of the schema to lookup in the Schema Registry property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${schema.name}")
.dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
.build();
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("JSON Schema Registry")
.displayName("JSON Schema Registry")
.description("Specifies the Controller Service to use for the JSON Schema Registry")
.identifiesControllerService(JsonSchemaRegistry.class)
.required(true)
.dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
.build();
public static final PropertyDescriptor SCHEMA_CONTENT = new PropertyDescriptor.Builder()
.name(SCHEMA_CONTENT_PROPERTY_NAME)
.displayName(SCHEMA_CONTENT_PROPERTY_NAME)
.description("A URL or file path to the JSON schema or the actual JSON schema content")
.required(true)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
.build();
public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
.Builder().name("Schema Version")
.displayName("Schema Version")
.description("The JSON schema specification")
.required(true)
.allowableValues(SchemaVersion.class)
.defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(JsonSchemaRegistryComponent.SCHEMA_VERSION)
.dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_ACCESS_STRATEGY,
SCHEMA_NAME,
SCHEMA_REGISTRY,
SCHEMA_CONTENT,
SCHEMA_VERSION
)
);
public static final Relationship REL_VALID = new Relationship.Builder()
@ -158,21 +193,28 @@ public class ValidateJson extends AbstractProcessor {
.description("FlowFiles that cannot be read as JSON are routed to this relationship")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(
private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_VALID,
REL_INVALID,
REL_FAILURE
))
);
private static final ObjectMapper MAPPER;
static {
MAPPER = new ObjectMapper();
MAPPER.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
}
private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_COMMENTS, true);
private JsonSchema schema;
private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories = Arrays.stream(SchemaVersion.values())
.collect(
Collectors.toConcurrentMap(
Function.identity(),
schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())
)
);
private volatile com.networknt.schema.JsonSchema schema;
private volatile JsonSchemaRegistry jsonSchemaRegistry;
@Override
public void migrateProperties(final PropertyConfiguration config) {
config.renameProperty("Schema Version", SCHEMA_VERSION.getName());
}
@Override
public Set<Relationship> getRelationships() {
@ -184,14 +226,40 @@ public class ValidateJson extends AbstractProcessor {
return PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> validationResults = new ArrayList<>();
final String schemaAccessStrategy = getSchemaAccessStrategy(validationContext);
if (isNameStrategy(validationContext) && !validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
validationResults.add(new ValidationResult.Builder()
.subject(SCHEMA_REGISTRY.getDisplayName())
.explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_REGISTRY))
.valid(false)
.build());
} else if (isContentStrategy(validationContext) && !validationContext.getProperty(SCHEMA_CONTENT).isSet()) {
validationResults.add(new ValidationResult.Builder()
.subject(SCHEMA_CONTENT.getDisplayName())
.explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_CONTENT))
.valid(false)
.build());
}
return validationResults;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
if (isNameStrategy(context)) {
jsonSchemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(JsonSchemaRegistry.class);
} else if (isContentStrategy(context)) {
try (final InputStream inputStream = context.getProperty(SCHEMA_CONTENT).asResource().read()) {
final SchemaVersion schemaVersion = SchemaVersion.valueOf(context.getProperty(SCHEMA_VERSION).getValue());
final JsonSchemaFactory factory = JsonSchemaFactory.getInstance(schemaVersion.getVersionFlag());
final JsonSchemaFactory factory = schemaFactories.get(schemaVersion);
schema = factory.getSchema(inputStream);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -200,6 +268,20 @@ public class ValidateJson extends AbstractProcessor {
return;
}
if (isNameStrategy(context)) {
try {
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final JsonSchema jsonSchema = jsonSchemaRegistry.retrieveSchema(schemaName);
final JsonSchemaFactory factory = schemaFactories.get(jsonSchema.getSchemaVersion());
schema = factory.getSchema(jsonSchema.getSchemaText());
} catch (Exception e) {
getLogger().error("Could not retrieve JSON schema for {}", flowFile, e);
session.getProvenanceReporter().route(flowFile, REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
try (final InputStream in = session.read(flowFile)) {
final JsonNode node = MAPPER.readTree(in);
final Set<ValidationMessage> errors = schema.validate(node);
@ -221,4 +303,22 @@ public class ValidateJson extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
private String getPropertyValidateMessage(String schemaAccessStrategy, PropertyDescriptor property) {
return "The '" + schemaAccessStrategy + "' Schema Access Strategy requires that the " + property.getDisplayName() + " property be set.";
}
private boolean isNameStrategy(PropertyContext context) {
final String schemaAccessStrategy = getSchemaAccessStrategy(context);
return JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue().equals(schemaAccessStrategy);
}
private String getSchemaAccessStrategy(PropertyContext context) {
return context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
}
private boolean isContentStrategy(PropertyContext context) {
final String schemaAccessStrategy = getSchemaAccessStrategy(context);
return JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue().equals(schemaAccessStrategy);
}
}

View File

@ -17,12 +17,22 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.json.schema.JsonSchema;
import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.json.schema.SchemaVersion;
import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.AssertionFailedError;
import java.io.IOException;
@ -30,18 +40,22 @@ import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestValidateJson {
private static final String JSON = getFileContent("simple-example.json");
private static final String SIMPLE_SCHEMA = getFileContent("schema-simple-example.json");
private static final String NON_JSON = "Not JSON";
private static final String SCHEMA_VERSION = ValidateJson.SchemaVersion.DRAFT_7.getValue();
private static final String SCHEMA_VERSION = SchemaVersion.DRAFT_7.getValue();
private TestRunner runner;
@BeforeEach
@ -49,11 +63,39 @@ class TestValidateJson {
runner = TestRunners.newTestRunner(ValidateJson.class);
}
@ParameterizedTest(name = "{2}")
@MethodSource("customValidateArgs")
void testCustomValidateMissingProperty(String strategy, String errMsg) {
runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
runner.enqueue(JSON);
AssertionFailedError e = assertThrows(AssertionFailedError.class, () -> runner.run());
assertTrue(e.getMessage().contains(errMsg));
}
@Test
void testPassSchema() {
final String schemaPath = getFilePath("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
runner.assertTransferCount(ValidateJson.REL_VALID, 1);
assertValidationErrors(ValidateJson.REL_VALID, false);
assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.ROUTE, runner.getProvenanceEvents().get(0).getEventType());
}
@Test
void testNoSchemaVersionSpecified() {
final String schemaPath = getFilePath("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
runner.enqueue(JSON);
@ -71,7 +113,7 @@ class TestValidateJson {
@Test
void testEmptySchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, "{}");
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@ -89,7 +131,7 @@ class TestValidateJson {
void testAllUnknownKeywordsSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT,
"{\"fruit\": \"Apple\", \"size\": \"Large\", \"color\": \"Red\"}");
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@ -107,7 +149,7 @@ class TestValidateJson {
void testPatternSchemaCheck() {
final String schemaPath = getFilePath("schema-simple-example-unmatched-pattern.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@ -125,7 +167,7 @@ class TestValidateJson {
void testMissingRequiredValue() {
final String schema = getFileContent("schema-simple-example-missing-required.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@ -141,9 +183,8 @@ class TestValidateJson {
@Test
void testInvalidJson() {
final String schema = getFileContent("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(NON_JSON);
runner.run();
@ -160,7 +201,7 @@ class TestValidateJson {
@Test
void testNonExistingSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, "not-found.json");
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
assertThrows(AssertionFailedError.class, () -> runner.run());
@ -169,7 +210,7 @@ class TestValidateJson {
@Test
void testBadSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, NON_JSON);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
assertThrows(AssertionFailedError.class, () -> runner.run());
@ -179,7 +220,7 @@ class TestValidateJson {
void testJsonWithComments() {
final String schemaPath = getFilePath("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(getFileContent("simple-example-with-comments.json"));
@ -191,6 +232,28 @@ class TestValidateJson {
assertValidationErrors(ValidateJson.REL_VALID, false);
}
@Test
void testSchemaRetrievalFromRegistry() throws InitializationException {
final String registryIdentifier = "registry";
final String schemaName = "someSchema";
final JsonSchemaRegistry validJsonSchemaRegistry = new SampleJsonSchemaRegistry(registryIdentifier, schemaName);
runner.addControllerService(registryIdentifier, validJsonSchemaRegistry);
runner.enableControllerService(validJsonSchemaRegistry);
runner.assertValid(validJsonSchemaRegistry);
runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue());
runner.setProperty(ValidateJson.SCHEMA_REGISTRY, registryIdentifier);
Map<String, String> attributes = new HashMap<>();
attributes.put("schema.name", schemaName);
runner.enqueue(JSON, attributes);
runner.run();
runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
runner.assertTransferCount(ValidateJson.REL_VALID, 1);
}
private void assertValidationErrors(Relationship relationship, boolean expected) {
final Map<String, String> attributes = runner.getFlowFilesForRelationship(relationship).get(0).getAttributes();
@ -202,6 +265,13 @@ class TestValidateJson {
}
}
private static Stream<Arguments> customValidateArgs() {
return Stream.of(
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue(), "requires that the JSON Schema Registry property be set", "no registry set"),
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue(), "requires that the JSON Schema property be set", "no content specified")
);
}
private static String getFilePath(final String filename) {
final String path = getRelativeResourcePath(filename);
final URL url = Objects.requireNonNull(TestValidateJson.class.getResource(path), "Resource not found");
@ -221,4 +291,28 @@ class TestValidateJson {
private static String getRelativeResourcePath(final String filename) {
return String.format("/%s/%s", TestValidateJson.class.getSimpleName(), filename);
}
private static class SampleJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry {
private final String identifier;
private final String schemaName;
public SampleJsonSchemaRegistry(String identifier, String schemaName) {
this.identifier = identifier;
this.schemaName = schemaName;
}
@Override
public String getIdentifier() {
return identifier;
}
@Override
public JsonSchema retrieveSchema(String schemaName) throws SchemaNotFoundException {
if (this.schemaName.equals(schemaName)) {
return new JsonSchema(SchemaVersion.DRAFT_2020_12, "{}");
} else {
throw new SchemaNotFoundException("");
}
}
}
}

View File

@ -31,5 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-schema-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schemaregistry.services;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.json.schema.JsonSchema;
import java.io.IOException;
/**
* Represents {@link ControllerService} strategy to expose internal and/or
* integrate with external Schema Registry
*/
public interface JsonSchemaRegistry extends ControllerService {
/**
* Retrieves the schema based on the provided schema name
* @param schemaName The name of the schema
* @return the schema for the given descriptor
* @throws IOException if unable to communicate with the backing store
* @throws SchemaNotFoundException if unable to find the schema based on the given descriptor
*/
JsonSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException;
}