diff --git a/nifi-commons/nifi-json-schema-api/pom.xml b/nifi-commons/nifi-json-schema-api/pom.xml
new file mode 100644
index 0000000000..eacabf87a7
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-api/pom.xml
@@ -0,0 +1,33 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-commons
+ 2.0.0-SNAPSHOT
+
+ nifi-json-schema-api
+
+
+ org.apache.nifi
+ nifi-api
+ 2.0.0-SNAPSHOT
+
+
+
\ No newline at end of file
diff --git a/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java
new file mode 100644
index 0000000000..94b1fd18a3
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json.schema;
+
+import java.util.Objects;
+
+public class JsonSchema {
+ private final SchemaVersion schemaVersion;
+ private final String schemaText;
+
+ public JsonSchema(SchemaVersion schemaVersion, String schemaText) {
+ Objects.requireNonNull(schemaVersion, "Schema version cannot be null");
+ Objects.requireNonNull(schemaText, "The text of the schema cannot be null");
+ this.schemaVersion = schemaVersion;
+ this.schemaText = schemaText;
+ }
+
+ public SchemaVersion getSchemaVersion() {
+ return schemaVersion;
+ }
+
+ public String getSchemaText() {
+ return schemaText;
+ }
+}
diff --git a/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java
new file mode 100644
index 0000000000..2f386a63e9
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json.schema;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SchemaVersion implements DescribedValue {
+ DRAFT_4("Draft Version 4", "Draft 4", "https://json-schema.org/draft-04/schema"),
+ DRAFT_6("Draft Version 6", "Draft 6", "https://json-schema.org/draft-06/schema"),
+ DRAFT_7("Draft Version 7", "Draft 7", "https://json-schema.org/draft-07/schema"),
+ DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", "https://json-schema.org/draft/2019-09/schema"),
+ DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", "https://json-schema.org/draft/2020-12/schema");
+
+ private final String description;
+ private final String displayName;
+ private final String uri;
+
+ SchemaVersion(String description, String displayName, String uri) {
+ this.description = description;
+ this.displayName = displayName;
+ this.uri = uri;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+}
diff --git a/nifi-commons/nifi-json-schema-shared/pom.xml b/nifi-commons/nifi-json-schema-shared/pom.xml
new file mode 100644
index 0000000000..35149bd83f
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-shared/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-commons
+ 2.0.0-SNAPSHOT
+
+ nifi-json-schema-shared
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-json-schema-api
+ 2.0.0-SNAPSHOT
+
+
+
\ No newline at end of file
diff --git a/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java b/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java
new file mode 100644
index 0000000000..0fc53bed30
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.json.schema.SchemaVersion;
+
+public interface JsonSchemaRegistryComponent {
+ PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
+ .Builder()
+ .name("JSON Schema Version")
+ .displayName("JSON Schema Version")
+ .description("The JSON schema specification")
+ .required(true)
+ .allowableValues(SchemaVersion.class)
+ .defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
+ .build();
+}
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index dddf9b7d86..034b39f123 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -30,6 +30,8 @@
nifi-flowfile-packager
nifi-flow-encryptor
nifi-hl7-query-language
+ nifi-json-schema-api
+ nifi-json-schema-shared
nifi-json-utils
nifi-jetty-configuration
nifi-kubernetes-client
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index b879530b07..be9603ff9b 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -44,6 +44,11 @@ language governing permissions and limitations under the License. -->
org.apache.nifi
nifi-utils
+
+ org.apache.nifi
+ nifi-json-schema-api
+ 2.0.0-SNAPSHOT
+
org.apache.commons
commons-lang3
@@ -52,5 +57,20 @@ language governing permissions and limitations under the License. -->
commons-io
commons-io
+
+ com.networknt
+ json-schema-validator
+ 1.0.87
+
+
+ org.apache.nifi
+ nifi-json-schema-shared
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java
new file mode 100644
index 0000000000..3e877ef8e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing JSON schemas. One can register a schema "
+ + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
+ + "representation of the actual schema following the syntax and semantics of the JSON Schema format. "
+ + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas."
+ + "The registry is heterogeneous registry as it can store schemas of different schema draft versions. "
+ + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version "
+ + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema Name", value = "Schema Content",
+ description = "Adds a named schema using the JSON string representation of a JSON schema",
+ expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class StandardJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+ private static final List PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION);
+
+ private final ConcurrentMap jsonSchemas;
+ private final ConcurrentMap schemaFactories;
+ private volatile SchemaVersion schemaVersion;
+
+ public StandardJsonSchemaRegistry() {
+ jsonSchemas = new ConcurrentHashMap<>();
+ schemaFactories = Arrays.stream(SchemaVersion.values())
+ .collect(Collectors.toConcurrentMap(Function.identity(),
+ schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+ schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) {
+ schemaVersion = SchemaVersion.valueOf(newValue);
+ } else if(descriptor.isDynamic() && isBlank(newValue)) {
+ jsonSchemas.remove(descriptor.getName());
+ } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+ try {
+ final String schemaName = descriptor.getName();
+ final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion);
+ jsonSchemaFactory.getSchema(newValue);
+ jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue));
+ } catch (final Exception e) {
+ getLogger().debug("Exception thrown when changing value of schema name '{}' from '{}' to '{}'",
+ descriptor.getName(), oldValue, newValue, e);
+ }
+ }
+ }
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final Set results = new HashSet<>();
+
+ final boolean noSchemasConfigured = validationContext.getProperties().keySet().stream()
+ .noneMatch(PropertyDescriptor::isDynamic);
+ if (noSchemasConfigured) {
+ results.add(new ValidationResult.Builder()
+ .subject("Supported Dynamic Property Descriptor")
+ .valid(false)
+ .explanation("There must be at least one JSON schema specified")
+ .build());
+ } else {
+ // Iterate over dynamic properties, validating only newly added schemas, and adding results
+ schemaVersion = SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue());
+ validationContext.getProperties().entrySet().stream()
+ .filter(entry -> entry.getKey().isDynamic() && !jsonSchemas.containsKey(entry.getKey().getName()))
+ .forEach(entry -> {
+ String subject = entry.getKey().getName();
+ String input = entry.getValue();
+ if (isNotBlank(input)) {
+ try {
+ final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion);
+ jsonSchemaFactory.getSchema(input);
+ } catch (Exception e) {
+ results.add(new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Not a valid JSON Schema: " + e.getMessage())
+ .build());
+ }
+ }
+ });
+ }
+
+ return results;
+ }
+
+ @Override
+ public JsonSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
+ JsonSchema jsonSchema = jsonSchemas.get(schemaName);
+ if (jsonSchema == null) {
+ throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
+ }
+ return jsonSchema;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dynamic(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index a000cd743d..1cb051bdac 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
\ No newline at end of file
+org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
+org.apache.nifi.schemaregistry.services.StandardJsonSchemaRegistry
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java
new file mode 100644
index 0000000000..87560ee01e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.nifi.util.TestRunners;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestStandardJsonSchemaRegistry {
+ private static final String SCHEMA_NAME = "fooSchema";
+ private static final PropertyDescriptor SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR = new PropertyDescriptor.Builder()
+ .name(SCHEMA_NAME)
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dynamic(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ @Mock
+ private ValidationContext validationContext;
+ @Mock
+ private PropertyValue propertyValue;
+ private Map properties;
+ private StandardJsonSchemaRegistry delegate;
+
+ @BeforeEach
+ void setUp() throws InitializationException {
+ properties = new HashMap<>();
+ delegate = new StandardJsonSchemaRegistry();
+ TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("jsonSchemaRegistry", delegate);
+ }
+
+ @Test
+ void testCustomValidateWithoutAnySchemaSpecified() {
+ when(validationContext.getProperties()).thenReturn(properties);
+
+ final Collection results = delegate.customValidate(validationContext);
+ assertEquals(1, results.size());
+ final ValidationResult result = results.iterator().next();
+ assertFalse(result.isValid());
+ assertTrue(result.getExplanation().contains("at least one JSON schema specified"));
+ }
+
+ @ParameterizedTest(name = "{3}")
+ @MethodSource("dynamicProperties")
+ void testCustomValidateWithSchemaRegistrationFromDynamicProperties(PropertyDescriptor propertyDescriptor, String schema, int numValidationErrors) {
+ when(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION)).thenReturn(propertyValue);
+ when(propertyValue.getValue()).thenReturn(JsonSchemaRegistryComponent.SCHEMA_VERSION.getDefaultValue());
+ when(validationContext.getProperties()).thenReturn(properties);
+ properties.put(propertyDescriptor, schema);
+ delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDisplayName()));
+
+ assertEquals(numValidationErrors, delegate.customValidate(validationContext).size());
+ }
+
+ @ParameterizedTest(name = "{3}")
+ @MethodSource("dynamicProperties")
+ void testSchemaRetrieval(PropertyDescriptor propertyDescriptor, String schema, int numValidationErrors) throws SchemaNotFoundException {
+ delegate.onPropertyModified(propertyDescriptor, null, schema);
+ boolean validSchema = numValidationErrors == 0;
+
+ if(validSchema) {
+ assertDoesNotThrow(() -> delegate.retrieveSchema(SCHEMA_NAME));
+ assertNotNull(delegate.retrieveSchema(SCHEMA_NAME));
+ } else {
+ assertThrows(SchemaNotFoundException.class, () -> delegate.retrieveSchema(SCHEMA_NAME));
+ }
+ }
+
+ private static Stream dynamicProperties() {
+ return Stream.of(
+ Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "{}", 0, "empty object schema"),
+ Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "[]", 0, "empty array schema"),
+ Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "not a schema", 1, "non whitespace")
+ );
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 5becb29639..7bf8071ead 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -499,6 +499,16 @@
mockwebserver
test
+
+ org.apache.nifi
+ nifi-json-schema-api
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-json-schema-shared
+ 2.0.0-SNAPSHOT
+
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
index 25b1499135..29a10456e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
@@ -19,9 +19,8 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
-import com.networknt.schema.SpecVersion.VersionFlag;
+import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -39,23 +38,35 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.json.schema.SchemaVersion;
+import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
@SideEffectFree
@SupportsBatching
@@ -79,23 +90,22 @@ import java.util.Set;
}
)
public class ValidateJson extends AbstractProcessor {
- public enum SchemaVersion implements DescribedValue {
- DRAFT_4("Draft Version 4", "Draft 4", VersionFlag.V4),
- DRAFT_6("Draft Version 6", "Draft 6", VersionFlag.V6),
- DRAFT_7("Draft Version 7", "Draft 7", VersionFlag.V7),
- DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", VersionFlag.V201909),
- DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", VersionFlag.V202012);
+ public enum JsonSchemaStrategy implements DescribedValue {
+ SCHEMA_NAME_PROPERTY(SCHEMA_NAME_PROPERTY_NAME + " Property",
+ "The name of the Schema to use is specified by the '" + SCHEMA_NAME_PROPERTY_NAME +
+ "' Property. The value of this property is used to lookup the Schema in the configured JSON Schema Registry Service."),
+ SCHEMA_CONTENT_PROPERTY(SCHEMA_CONTENT_PROPERTY_NAME + " Property",
+ "A URL or file path to the JSON schema or the actual JSON schema is specified by the '" + SCHEMA_CONTENT_PROPERTY_NAME + "' Property. " +
+ "No matter how the JSON schema is specified, it must be a valid JSON schema");
- private final String description;
- private final String displayName;
- private final VersionFlag versionFlag;
-
- SchemaVersion(String description, String displayName, VersionFlag versionFlag) {
- this.description = description;
+ JsonSchemaStrategy(String displayName, String description) {
this.displayName = displayName;
- this.versionFlag = versionFlag;
+ this.description = description;
}
+ private final String displayName;
+ private final String description;
+
@Override
public String getValue() {
return name();
@@ -110,37 +120,62 @@ public class ValidateJson extends AbstractProcessor {
public String getDescription() {
return description;
}
-
- public VersionFlag getVersionFlag() {
- return versionFlag;
- }
}
- public static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
+ protected static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
+ private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
+ private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
- public static final PropertyDescriptor SCHEMA_CONTENT = new PropertyDescriptor
- .Builder().name("JSON Schema")
- .displayName("JSON Schema")
- .description("The content of a JSON Schema")
+ public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Schema Access Strategy")
+ .displayName("Schema Access Strategy")
+ .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
+ .allowableValues(JsonSchemaStrategy.class)
+ .defaultValue(JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+ .name(SCHEMA_NAME_PROPERTY_NAME)
+ .displayName(SCHEMA_NAME_PROPERTY_NAME)
+ .description("Specifies the name of the schema to lookup in the Schema Registry property")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("${schema.name}")
+ .dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
+ .build();
+
+ public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
+ .name("JSON Schema Registry")
+ .displayName("JSON Schema Registry")
+ .description("Specifies the Controller Service to use for the JSON Schema Registry")
+ .identifiesControllerService(JsonSchemaRegistry.class)
+ .required(true)
+ .dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
+ .build();
+
+ public static final PropertyDescriptor SCHEMA_CONTENT = new PropertyDescriptor.Builder()
+ .name(SCHEMA_CONTENT_PROPERTY_NAME)
+ .displayName(SCHEMA_CONTENT_PROPERTY_NAME)
+ .description("A URL or file path to the JSON schema or the actual JSON schema content")
.required(true)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
.build();
- public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
- .Builder().name("Schema Version")
- .displayName("Schema Version")
- .description("The JSON schema specification")
- .required(true)
- .allowableValues(SchemaVersion.class)
- .defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
- .build();
+ public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(JsonSchemaRegistryComponent.SCHEMA_VERSION)
+ .dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
+ .build();
- public static final List PROPERTIES = Collections.unmodifiableList(
- Arrays.asList(
- SCHEMA_CONTENT,
- SCHEMA_VERSION
- )
+ private static final List PROPERTIES = List.of(
+ SCHEMA_ACCESS_STRATEGY,
+ SCHEMA_NAME,
+ SCHEMA_REGISTRY,
+ SCHEMA_CONTENT,
+ SCHEMA_VERSION
);
public static final Relationship REL_VALID = new Relationship.Builder()
@@ -158,21 +193,28 @@ public class ValidateJson extends AbstractProcessor {
.description("FlowFiles that cannot be read as JSON are routed to this relationship")
.build();
- public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(
- Arrays.asList(
- REL_VALID,
- REL_INVALID,
- REL_FAILURE
- ))
+ private static final Set RELATIONSHIPS = Set.of(
+ REL_VALID,
+ REL_INVALID,
+ REL_FAILURE
);
- private static final ObjectMapper MAPPER;
- static {
- MAPPER = new ObjectMapper();
- MAPPER.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
- }
+ private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_COMMENTS, true);
- private JsonSchema schema;
+ private final ConcurrentMap schemaFactories = Arrays.stream(SchemaVersion.values())
+ .collect(
+ Collectors.toConcurrentMap(
+ Function.identity(),
+ schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())
+ )
+ );
+ private volatile com.networknt.schema.JsonSchema schema;
+ private volatile JsonSchemaRegistry jsonSchemaRegistry;
+
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("Schema Version", SCHEMA_VERSION.getName());
+ }
@Override
public Set getRelationships() {
@@ -184,12 +226,38 @@ public class ValidateJson extends AbstractProcessor {
return PROPERTIES;
}
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ Collection validationResults = new ArrayList<>();
+ final String schemaAccessStrategy = getSchemaAccessStrategy(validationContext);
+
+ if (isNameStrategy(validationContext) && !validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(SCHEMA_REGISTRY.getDisplayName())
+ .explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_REGISTRY))
+ .valid(false)
+ .build());
+ } else if (isContentStrategy(validationContext) && !validationContext.getProperty(SCHEMA_CONTENT).isSet()) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(SCHEMA_CONTENT.getDisplayName())
+ .explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_CONTENT))
+ .valid(false)
+ .build());
+ }
+
+ return validationResults;
+ }
+
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
- try (final InputStream inputStream = context.getProperty(SCHEMA_CONTENT).asResource().read()) {
- final SchemaVersion schemaVersion = SchemaVersion.valueOf(context.getProperty(SCHEMA_VERSION).getValue());
- final JsonSchemaFactory factory = JsonSchemaFactory.getInstance(schemaVersion.getVersionFlag());
- schema = factory.getSchema(inputStream);
+ if (isNameStrategy(context)) {
+ jsonSchemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(JsonSchemaRegistry.class);
+ } else if (isContentStrategy(context)) {
+ try (final InputStream inputStream = context.getProperty(SCHEMA_CONTENT).asResource().read()) {
+ final SchemaVersion schemaVersion = SchemaVersion.valueOf(context.getProperty(SCHEMA_VERSION).getValue());
+ final JsonSchemaFactory factory = schemaFactories.get(schemaVersion);
+ schema = factory.getSchema(inputStream);
+ }
}
}
@@ -200,6 +268,20 @@ public class ValidateJson extends AbstractProcessor {
return;
}
+ if (isNameStrategy(context)) {
+ try {
+ final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final JsonSchema jsonSchema = jsonSchemaRegistry.retrieveSchema(schemaName);
+ final JsonSchemaFactory factory = schemaFactories.get(jsonSchema.getSchemaVersion());
+ schema = factory.getSchema(jsonSchema.getSchemaText());
+ } catch (Exception e) {
+ getLogger().error("Could not retrieve JSON schema for {}", flowFile, e);
+ session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
try (final InputStream in = session.read(flowFile)) {
final JsonNode node = MAPPER.readTree(in);
final Set errors = schema.validate(node);
@@ -221,4 +303,22 @@ public class ValidateJson extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
+
+ private String getPropertyValidateMessage(String schemaAccessStrategy, PropertyDescriptor property) {
+ return "The '" + schemaAccessStrategy + "' Schema Access Strategy requires that the " + property.getDisplayName() + " property be set.";
+ }
+
+ private boolean isNameStrategy(PropertyContext context) {
+ final String schemaAccessStrategy = getSchemaAccessStrategy(context);
+ return JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue().equals(schemaAccessStrategy);
+ }
+
+ private String getSchemaAccessStrategy(PropertyContext context) {
+ return context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+ }
+
+ private boolean isContentStrategy(PropertyContext context) {
+ final String schemaAccessStrategy = getSchemaAccessStrategy(context);
+ return JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue().equals(schemaAccessStrategy);
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
index 14688ce706..c7b44177ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
@@ -17,12 +17,22 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.SchemaVersion;
+import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.AssertionFailedError;
import java.io.IOException;
@@ -30,18 +40,22 @@ import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class TestValidateJson {
private static final String JSON = getFileContent("simple-example.json");
+ private static final String SIMPLE_SCHEMA = getFileContent("schema-simple-example.json");
private static final String NON_JSON = "Not JSON";
- private static final String SCHEMA_VERSION = ValidateJson.SchemaVersion.DRAFT_7.getValue();
+ private static final String SCHEMA_VERSION = SchemaVersion.DRAFT_7.getValue();
private TestRunner runner;
@BeforeEach
@@ -49,11 +63,39 @@ class TestValidateJson {
runner = TestRunners.newTestRunner(ValidateJson.class);
}
+ @ParameterizedTest(name = "{2}")
+ @MethodSource("customValidateArgs")
+ void testCustomValidateMissingProperty(String strategy, String errMsg) {
+ runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
+ runner.enqueue(JSON);
+
+ AssertionFailedError e = assertThrows(AssertionFailedError.class, () -> runner.run());
+ assertTrue(e.getMessage().contains(errMsg));
+ }
+
@Test
void testPassSchema() {
final String schemaPath = getFilePath("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
+
+ runner.enqueue(JSON);
+
+ runner.run();
+
+ runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+ runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+ runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+
+ assertValidationErrors(ValidateJson.REL_VALID, false);
+ assertEquals(1, runner.getProvenanceEvents().size());
+ assertEquals(ProvenanceEventType.ROUTE, runner.getProvenanceEvents().get(0).getEventType());
+ }
+
+ @Test
+ void testNoSchemaVersionSpecified() {
+ final String schemaPath = getFilePath("schema-simple-example.json");
+ runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
runner.enqueue(JSON);
@@ -71,7 +113,7 @@ class TestValidateJson {
@Test
void testEmptySchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, "{}");
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@@ -89,7 +131,7 @@ class TestValidateJson {
void testAllUnknownKeywordsSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT,
"{\"fruit\": \"Apple\", \"size\": \"Large\", \"color\": \"Red\"}");
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@@ -107,7 +149,7 @@ class TestValidateJson {
void testPatternSchemaCheck() {
final String schemaPath = getFilePath("schema-simple-example-unmatched-pattern.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@@ -125,7 +167,7 @@ class TestValidateJson {
void testMissingRequiredValue() {
final String schema = getFileContent("schema-simple-example-missing-required.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
runner.run();
@@ -141,9 +183,8 @@ class TestValidateJson {
@Test
void testInvalidJson() {
- final String schema = getFileContent("schema-simple-example.json");
- runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(NON_JSON);
runner.run();
@@ -160,7 +201,7 @@ class TestValidateJson {
@Test
void testNonExistingSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, "not-found.json");
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
assertThrows(AssertionFailedError.class, () -> runner.run());
@@ -169,7 +210,7 @@ class TestValidateJson {
@Test
void testBadSchema() {
runner.setProperty(ValidateJson.SCHEMA_CONTENT, NON_JSON);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(JSON);
assertThrows(AssertionFailedError.class, () -> runner.run());
@@ -179,7 +220,7 @@ class TestValidateJson {
void testJsonWithComments() {
final String schemaPath = getFilePath("schema-simple-example.json");
runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
- runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.enqueue(getFileContent("simple-example-with-comments.json"));
@@ -191,6 +232,28 @@ class TestValidateJson {
assertValidationErrors(ValidateJson.REL_VALID, false);
}
+
+ @Test
+ void testSchemaRetrievalFromRegistry() throws InitializationException {
+ final String registryIdentifier = "registry";
+ final String schemaName = "someSchema";
+ final JsonSchemaRegistry validJsonSchemaRegistry = new SampleJsonSchemaRegistry(registryIdentifier, schemaName);
+ runner.addControllerService(registryIdentifier, validJsonSchemaRegistry);
+ runner.enableControllerService(validJsonSchemaRegistry);
+ runner.assertValid(validJsonSchemaRegistry);
+ runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue());
+ runner.setProperty(ValidateJson.SCHEMA_REGISTRY, registryIdentifier);
+
+ Map attributes = new HashMap<>();
+ attributes.put("schema.name", schemaName);
+ runner.enqueue(JSON, attributes);
+ runner.run();
+
+ runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+ runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+ runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+ }
+
private void assertValidationErrors(Relationship relationship, boolean expected) {
final Map attributes = runner.getFlowFilesForRelationship(relationship).get(0).getAttributes();
@@ -202,6 +265,13 @@ class TestValidateJson {
}
}
+ private static Stream customValidateArgs() {
+ return Stream.of(
+ Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue(), "requires that the JSON Schema Registry property be set", "no registry set"),
+ Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue(), "requires that the JSON Schema property be set", "no content specified")
+ );
+ }
+
private static String getFilePath(final String filename) {
final String path = getRelativeResourcePath(filename);
final URL url = Objects.requireNonNull(TestValidateJson.class.getResource(path), "Resource not found");
@@ -221,4 +291,28 @@ class TestValidateJson {
private static String getRelativeResourcePath(final String filename) {
return String.format("/%s/%s", TestValidateJson.class.getSimpleName(), filename);
}
+
+ private static class SampleJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry {
+ private final String identifier;
+ private final String schemaName;
+
+ public SampleJsonSchemaRegistry(String identifier, String schemaName) {
+ this.identifier = identifier;
+ this.schemaName = schemaName;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public JsonSchema retrieveSchema(String schemaName) throws SchemaNotFoundException {
+ if (this.schemaName.equals(schemaName)) {
+ return new JsonSchema(SchemaVersion.DRAFT_2020_12, "{}");
+ } else {
+ throw new SchemaNotFoundException("");
+ }
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
index d5f5032580..8b5b204eaa 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
@@ -31,5 +31,10 @@
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-json-schema-api
+ 2.0.0-SNAPSHOT
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java
new file mode 100644
index 0000000000..364546fd10
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.JsonSchema;
+import java.io.IOException;
+
+/**
+ * Represents {@link ControllerService} strategy to expose internal and/or
+ * integrate with external Schema Registry
+ */
+public interface JsonSchemaRegistry extends ControllerService {
+ /**
+ * Retrieves the schema based on the provided schema name
+ * @param schemaName The name of the schema
+ * @return the schema for the given descriptor
+ * @throws IOException if unable to communicate with the backing store
+ * @throws SchemaNotFoundException if unable to find the schema based on the given descriptor
+ */
+ JsonSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException;
+}