mirror of https://github.com/apache/nifi.git
NIFI-4612: Allow AvroSchemaRegistry to disable name validation
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2275.
This commit is contained in:
parent
ff5325b923
commit
8f2501ffac
|
@ -17,8 +17,13 @@
|
||||||
package org.apache.nifi.schemaregistry.services;
|
package org.apache.nifi.schemaregistry.services;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -32,8 +37,12 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.avro.AvroTypeUtil;
|
import org.apache.nifi.avro.AvroTypeUtil;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.schema.access.SchemaField;
|
import org.apache.nifi.schema.access.SchemaField;
|
||||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
@ -49,26 +58,75 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
|
||||||
private final Map<String, String> schemaNameToSchemaMap;
|
private final Map<String, String> schemaNameToSchemaMap;
|
||||||
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
|
||||||
|
.name("avro-reg-validated-field-names")
|
||||||
|
.displayName("Validate Field Names")
|
||||||
|
.description("Whether or not to validate the field names in the Avro schema based on Avro naming rules. If set to true, all field names must be valid Avro names, "
|
||||||
|
+ "which must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_]. If set to false, no validation will be performed on the field names.")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
|
||||||
|
|
||||||
public AvroSchemaRegistry() {
|
public AvroSchemaRegistry() {
|
||||||
this.schemaNameToSchemaMap = new HashMap<>();
|
this.schemaNameToSchemaMap = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
||||||
|
super.init(config);
|
||||||
|
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
|
||||||
|
_propertyDescriptors.add(VALIDATE_FIELD_NAMES);
|
||||||
|
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
if (newValue == null) {
|
if(descriptor.isDynamic()) {
|
||||||
recordSchemas.remove(descriptor.getName());
|
// Dynamic property = schema, validate it
|
||||||
} else {
|
if (newValue == null) {
|
||||||
try {
|
recordSchemas.remove(descriptor.getName());
|
||||||
final Schema avroSchema = new Schema.Parser().parse(newValue);
|
} else {
|
||||||
final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
|
try {
|
||||||
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
|
// Use a non-strict parser here, a strict parse can be done (if specified) in customValidate().
|
||||||
recordSchemas.put(descriptor.getName(), recordSchema);
|
final Schema avroSchema = new Schema.Parser().setValidate(false).parse(newValue);
|
||||||
} catch (final Exception e) {
|
final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
|
||||||
// not a problem - the service won't be valid and the validation message will indicate what is wrong.
|
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
|
||||||
|
recordSchemas.put(descriptor.getName(), recordSchema);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
// not a problem - the service won't be valid and the validation message will indicate what is wrong.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
Set<ValidationResult> results = new HashSet<>();
|
||||||
|
boolean strict = validationContext.getProperty(VALIDATE_FIELD_NAMES).asBoolean();
|
||||||
|
|
||||||
|
// Iterate over dynamic properties, validating the schemas, and adding results
|
||||||
|
validationContext.getProperties().entrySet().stream().filter(entry -> entry.getKey().isDynamic()).forEach(entry -> {
|
||||||
|
String subject = entry.getKey().getDisplayName();
|
||||||
|
String input = entry.getValue();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Schema avroSchema = new Schema.Parser().setValidate(strict).parse(input);
|
||||||
|
AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.input(input)
|
||||||
|
.subject(subject)
|
||||||
|
.valid(false)
|
||||||
|
.explanation("Not a valid Avro Schema: " + e.getMessage())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
|
public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
|
||||||
final String schemaText = schemaNameToSchemaMap.get(schemaName);
|
final String schemaText = schemaNameToSchemaMap.get(schemaName);
|
||||||
|
@ -111,13 +169,17 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
|
||||||
.collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
|
.collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return propertyDescriptors;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
return new PropertyDescriptor.Builder()
|
return new PropertyDescriptor.Builder()
|
||||||
.name(propertyDescriptorName)
|
.name(propertyDescriptorName)
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(new AvroSchemaValidator())
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.dynamic(true)
|
.dynamic(true)
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -17,13 +17,18 @@
|
||||||
package org.apache.nifi.schemaregistry.services;
|
package org.apache.nifi.schemaregistry.services;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -54,7 +59,6 @@ public class TestAvroSchemaRegistry {
|
||||||
when(configContext.getProperties()).thenReturn(properties);
|
when(configContext.getProperties()).thenReturn(properties);
|
||||||
AvroSchemaRegistry delegate = new AvroSchemaRegistry();
|
AvroSchemaRegistry delegate = new AvroSchemaRegistry();
|
||||||
delegate.enable(configContext);
|
delegate.enable(configContext);
|
||||||
|
|
||||||
String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
|
String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
|
||||||
assertEquals(fooSchemaText, locatedSchemaText);
|
assertEquals(fooSchemaText, locatedSchemaText);
|
||||||
try {
|
try {
|
||||||
|
@ -65,4 +69,47 @@ public class TestAvroSchemaRegistry {
|
||||||
|
|
||||||
delegate.close();
|
delegate.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void validateStrictAndNonStrictSchemaRegistrationFromDynamicProperties() throws Exception {
|
||||||
|
String schemaName = "fooSchema";
|
||||||
|
ConfigurationContext configContext = mock(ConfigurationContext.class);
|
||||||
|
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
|
||||||
|
.name(schemaName)
|
||||||
|
.dynamic(true)
|
||||||
|
.build();
|
||||||
|
// NOTE: name of record and name of first field are not Avro-compliant, verified below
|
||||||
|
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"$User\", "
|
||||||
|
+ "\"fields\": [ " + "{\"name\": \"@name\", \"type\": [\"string\", \"null\"]}, "
|
||||||
|
+ "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, "
|
||||||
|
+ "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, "
|
||||||
|
+ "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
|
||||||
|
PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
|
||||||
|
.name("barSchema")
|
||||||
|
.dynamic(false)
|
||||||
|
.build();
|
||||||
|
properties.put(fooSchema, fooSchemaText);
|
||||||
|
properties.put(barSchema, "");
|
||||||
|
AvroSchemaRegistry delegate = new AvroSchemaRegistry();
|
||||||
|
delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDisplayName()));
|
||||||
|
when(configContext.getProperties()).thenReturn(properties);
|
||||||
|
|
||||||
|
ValidationContext validationContext = mock(ValidationContext.class);
|
||||||
|
when(validationContext.getProperties()).thenReturn(properties);
|
||||||
|
PropertyValue propertyValue = mock(PropertyValue.class);
|
||||||
|
when(validationContext.getProperty(AvroSchemaRegistry.VALIDATE_FIELD_NAMES)).thenReturn(propertyValue);
|
||||||
|
|
||||||
|
// Strict parsing
|
||||||
|
when(propertyValue.asBoolean()).thenReturn(true);
|
||||||
|
Collection<ValidationResult> results = delegate.customValidate(validationContext);
|
||||||
|
assertTrue(results.stream().anyMatch(result -> !result.isValid()));
|
||||||
|
|
||||||
|
// Non-strict parsing
|
||||||
|
when(propertyValue.asBoolean()).thenReturn(false);
|
||||||
|
results = delegate.customValidate(validationContext);
|
||||||
|
results.forEach(result -> assertTrue(result.isValid()));
|
||||||
|
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue