From 8f2501ffac68885c2baee4b313ff016454e038a3 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 16 Nov 2017 11:49:02 -0500 Subject: [PATCH] NIFI-4612: Allow AvroSchemaRegistry to disable name validation Signed-off-by: Pierre Villard This closes #2275. --- .../services/AvroSchemaRegistry.java | 84 ++++++++++++++++--- .../services/TestAvroSchemaRegistry.java | 49 ++++++++++- 2 files changed, 121 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java index 169d79ddbc..785d72958d 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -17,8 +17,13 @@ package org.apache.nifi.schemaregistry.services; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -32,8 +37,12 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -49,26 +58,75 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch private final Map schemaNameToSchemaMap; private final ConcurrentMap recordSchemas = new ConcurrentHashMap<>(); + static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("avro-reg-validated-field-names") + .displayName("Validate Field Names") + .description("Whether or not to validate the field names in the Avro schema based on Avro naming rules. If set to true, all field names must be valid Avro names, " + + "which must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_]. If set to false, no validation will be performed on the field names.") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + private List propertyDescriptors = new ArrayList<>(); + public AvroSchemaRegistry() { this.schemaNameToSchemaMap = new HashMap<>(); } + @Override + protected void init(ControllerServiceInitializationContext config) throws InitializationException { + super.init(config); + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(VALIDATE_FIELD_NAMES); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + } + @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - if (newValue == null) { - recordSchemas.remove(descriptor.getName()); - } else { - try { - final Schema avroSchema = new Schema.Parser().parse(newValue); - final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName()); - final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId); - recordSchemas.put(descriptor.getName(), recordSchema); - } catch (final Exception e) { - // not a problem - the service won't be valid and the validation message will indicate what is wrong. + if(descriptor.isDynamic()) { + // Dynamic property = schema, validate it + if (newValue == null) { + recordSchemas.remove(descriptor.getName()); + } else { + try { + // Use a non-strict parser here, a strict parse can be done (if specified) in customValidate(). + final Schema avroSchema = new Schema.Parser().setValidate(false).parse(newValue); + final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName()); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId); + recordSchemas.put(descriptor.getName(), recordSchema); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } } } } + @Override + protected Collection customValidate(ValidationContext validationContext) { + Set results = new HashSet<>(); + boolean strict = validationContext.getProperty(VALIDATE_FIELD_NAMES).asBoolean(); + + // Iterate over dynamic properties, validating the schemas, and adding results + validationContext.getProperties().entrySet().stream().filter(entry -> entry.getKey().isDynamic()).forEach(entry -> { + String subject = entry.getKey().getDisplayName(); + String input = entry.getValue(); + + try { + final Schema avroSchema = new Schema.Parser().setValidate(strict).parse(input); + AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY); + } catch (final Exception e) { + results.add(new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid Avro Schema: " + e.getMessage()) + .build()); + } + }); + return results; + } + @Override public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException { final String schemaText = schemaNameToSchemaMap.get(schemaName); @@ -111,13 +169,17 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); } + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .required(false) - .addValidator(new AvroSchemaValidator()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .dynamic(true) .expressionLanguageSupported(true) .build(); diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java index 9121f04f65..67a095924d 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java @@ -17,13 +17,18 @@ package org.apache.nifi.schemaregistry.services; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.junit.Assert; @@ -54,7 +59,6 @@ public class TestAvroSchemaRegistry { when(configContext.getProperties()).thenReturn(properties); AvroSchemaRegistry delegate = new AvroSchemaRegistry(); delegate.enable(configContext); - String locatedSchemaText = delegate.retrieveSchemaText(schemaName); assertEquals(fooSchemaText, locatedSchemaText); try { @@ -65,4 +69,47 @@ public class TestAvroSchemaRegistry { delegate.close(); } + + @Test + public void validateStrictAndNonStrictSchemaRegistrationFromDynamicProperties() throws Exception { + String schemaName = "fooSchema"; + ConfigurationContext configContext = mock(ConfigurationContext.class); + Map properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() + .name(schemaName) + .dynamic(true) + .build(); + // NOTE: name of record and name of first field are not Avro-compliant, verified below + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"$User\", " + + "\"fields\": [ " + "{\"name\": \"@name\", \"type\": [\"string\", \"null\"]}, " + + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; + PropertyDescriptor barSchema = new PropertyDescriptor.Builder() + .name("barSchema") + .dynamic(false) + .build(); + properties.put(fooSchema, fooSchemaText); + properties.put(barSchema, ""); + AvroSchemaRegistry delegate = new AvroSchemaRegistry(); + delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDisplayName())); + when(configContext.getProperties()).thenReturn(properties); + + ValidationContext validationContext = mock(ValidationContext.class); + when(validationContext.getProperties()).thenReturn(properties); + PropertyValue propertyValue = mock(PropertyValue.class); + when(validationContext.getProperty(AvroSchemaRegistry.VALIDATE_FIELD_NAMES)).thenReturn(propertyValue); + + // Strict parsing + when(propertyValue.asBoolean()).thenReturn(true); + Collection results = delegate.customValidate(validationContext); + assertTrue(results.stream().anyMatch(result -> !result.isValid())); + + // Non-strict parsing + when(propertyValue.asBoolean()).thenReturn(false); + results = delegate.customValidate(validationContext); + results.forEach(result -> assertTrue(result.isValid())); + + delegate.close(); + } }