NIFI-4935 Refactoring to support specifying schema branch or schema version when using schema by name strategy

This closes #2523.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2018-03-05 17:02:20 -05:00 committed by Mark Payne
parent 930417b9dc
commit de71a41bd0
35 changed files with 1282 additions and 299 deletions

View File

@ -22,7 +22,8 @@ public enum SchemaField {
SCHEMA_TEXT_FORMAT("Schema Text Format"),
SCHEMA_NAME("Schema Name"),
SCHEMA_IDENTIFIER("Schema Identifier"),
SCHEMA_VERSION("Schema Version");
SCHEMA_VERSION("Schema Version"),
SCHEMA_BRANCH_NAME("Schema Branch Name");
private final String description;

View File

@ -23,7 +23,7 @@ public class SchemaNotFoundException extends Exception {
}
public SchemaNotFoundException(final String message, final Throwable cause) {
super(cause);
super(message, cause);
}
public SchemaNotFoundException(final Throwable cause) {

View File

@ -38,14 +38,32 @@ public interface SchemaIdentifier {
*/
OptionalInt getVersion();
/**
* @return the name of the branch where the schema is located, if one has been defined
*/
Optional<String> getBranch();
public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null);
public static SchemaIdentifier ofName(final String name) {
return new StandardSchemaIdentifier(name, null, null);
SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null);
static Builder builder() {
return new StandardSchemaIdentifier.Builder();
}
public static SchemaIdentifier of(final String name, final long identifier, final int version) {
return new StandardSchemaIdentifier(name, identifier, version);
/**
* Implementations should provide a builder to create instances of the SchemaIdentifier.
*/
interface Builder {
Builder name(String name);
Builder id(Long id);
Builder version(Integer version);
Builder branch(String branch);
SchemaIdentifier build();
}
}

View File

@ -25,11 +25,17 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private final Optional<String> name;
private final OptionalLong identifier;
private final OptionalInt version;
private final Optional<String> branch;
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) {
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) {
this.name = Optional.ofNullable(name);
this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
this.branch = Optional.ofNullable(branch);
if (this.name == null && this.identifier == null) {
throw new IllegalStateException("Name or Identifier must be provided");
}
}
@Override
@ -47,9 +53,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return version;
}
@Override
public Optional<String> getBranch() {
return branch;
}
@Override
public int hashCode() {
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode();
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode();
}
@Override
@ -64,6 +75,49 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return false;
}
final SchemaIdentifier other = (SchemaIdentifier) obj;
return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion());
return getName().equals(other.getName())
&& getIdentifier().equals(other.getIdentifier())
&& getVersion().equals(other.getVersion())
&& getBranch().equals(other.getBranch());
}
/**
* Builder to create instances of SchemaIdentifier.
*/
public static class Builder implements SchemaIdentifier.Builder {
private String name;
private String branch;
private Long identifier;
private Integer version;
@Override
public SchemaIdentifier.Builder name(final String name) {
this.name = name;
return this;
}
@Override
public SchemaIdentifier.Builder id(final Long id) {
this.identifier = id;
return this;
}
@Override
public SchemaIdentifier.Builder version(final Integer version) {
this.version = version;
return this;
}
@Override
public SchemaIdentifier.Builder branch(final String branch) {
this.branch = branch;
return this;
}
@Override
public SchemaIdentifier build() {
return new StandardSchemaIdentifier(name, identifier, version, branch);
}
}
}

View File

@ -17,19 +17,6 @@
package org.apache.nifi.confluent.schemaregistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -47,9 +34,24 @@ import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Tags({"schema", "registry", "confluent", "avro", "kafka"})
@ -176,28 +178,33 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
return baseUrls;
}
@Override
public String retrieveSchemaText(final String schemaName) throws IOException, SchemaNotFoundException {
final RecordSchema schema = retrieveSchema(schemaName);
return schema.getSchemaText().get();
private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
final RecordSchema schema = client.getSchema(schemaName.get());
return schema;
}
@Override
public String retrieveSchemaText(final long schemaId, final int version) throws IOException, SchemaNotFoundException {
final RecordSchema schema = retrieveSchema(schemaId, version);
return schema.getSchemaText().get();
}
private RecordSchema retrieveSchemaById(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}
@Override
public RecordSchema retrieveSchema(final String schemaName) throws IOException, SchemaNotFoundException {
final RecordSchema schema = client.getSchema(schemaName);
final RecordSchema schema = client.getSchema((int) schemaId.getAsLong());
return schema;
}
@Override
public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, SchemaNotFoundException {
final RecordSchema schema = client.getSchema((int) schemaId);
return schema;
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
if (schemaIdentifier.getName().isPresent()) {
return retrieveSchemaByName(schemaIdentifier);
} else {
return retrieveSchemaById(schemaIdentifier);
}
}
@Override

View File

@ -145,7 +145,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.of(subject, id, version);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
return recordSchema;

View File

@ -16,19 +16,20 @@
*/
package org.apache.nifi.schema.access;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class SchemaAccessUtils {
public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
@ -77,6 +78,26 @@ public class SchemaAccessUtils {
.required(false)
.build();
public static final PropertyDescriptor SCHEMA_BRANCH_NAME = new PropertyDescriptor.Builder()
.name("schema-branch")
.displayName("Schema Branch")
.description("Specifies the name of the branch to use when looking up the schema in the Schema Registry property. " +
"If the chosen Schema Registry does not support branching, this value will be ignored.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor.Builder()
.name("schema-version")
.displayName("Schema Version")
.description("Specifies the version of the schema to lookup in the Schema Registry. " +
"If not specified then the latest version of the schema will be retrieved.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name("schema-text")
.displayName("Schema Text")
@ -89,12 +110,15 @@ public class SchemaAccessUtils {
public static Collection<ValidationResult> validateSchemaAccessStrategy(final ValidationContext validationContext, final String schemaAccessStrategyValue,
final List<AllowableValue> schemaAccessStrategyValues) {
final Collection<ValidationResult> validationResults = new ArrayList<>();
if (isSchemaRegistryRequired(schemaAccessStrategyValue)) {
final boolean registrySet = validationContext.getProperty(SCHEMA_REGISTRY).isSet();
if (!registrySet) {
final String schemaAccessStrategyName = getSchemaAccessStrategyName(schemaAccessStrategyValue, schemaAccessStrategyValues);
return Collections.singleton(new ValidationResult.Builder()
validationResults.add(new ValidationResult.Builder()
.subject("Schema Registry")
.explanation("The '" + schemaAccessStrategyName + "' Schema Access Strategy requires that the Schema Registry property be set.")
.valid(false)
@ -102,7 +126,21 @@ public class SchemaAccessUtils {
}
}
return Collections.emptyList();
// ensure that only branch or version is specified, but not both
if (SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessStrategyValue)) {
final boolean branchNameSet = validationContext.getProperty(SCHEMA_BRANCH_NAME).isSet();
final boolean versionSet = validationContext.getProperty(SCHEMA_VERSION).isSet();
if (branchNameSet && versionSet) {
validationResults.add(new ValidationResult.Builder()
.subject(SCHEMA_BRANCH_NAME.getDisplayName())
.explanation(SCHEMA_BRANCH_NAME.getDisplayName() + " and " + SCHEMA_VERSION.getDisplayName() + " cannot be specified together")
.valid(false)
.build());
}
}
return validationResults;
}
private static String getSchemaAccessStrategyName(final String schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) {
@ -123,7 +161,10 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION);
return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion);
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {

View File

@ -17,19 +17,21 @@
package org.apache.nifi.serialization.record;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MockSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private final ConcurrentMap<String, RecordSchema> schemaNameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> schemaIdVersionMap = new ConcurrentHashMap<>();
@ -38,41 +40,40 @@ public class MockSchemaRegistry extends AbstractControllerService implements Sch
schemaNameMap.put(name, schema);
}
@Override
public String retrieveSchemaText(final String schemaName) throws IOException, SchemaNotFoundException {
final RecordSchema schema = schemaNameMap.get(schemaName);
if (schema == null) {
return null;
private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
final Optional<String> text = schema.getSchemaText();
return text.orElse(null);
}
@Override
public String retrieveSchemaText(final long schemaId, final int version) throws IOException, SchemaNotFoundException {
final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
final RecordSchema schema = schemaIdVersionMap.get(tuple);
if (schema == null) {
return null;
}
final Optional<String> text = schema.getSchemaText();
return text.orElse(null);
}
@Override
public RecordSchema retrieveSchema(final String schemaName) throws IOException, SchemaNotFoundException {
return schemaNameMap.get(schemaName);
}
@Override
public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, SchemaNotFoundException {
final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}
final OptionalInt version = schemaIdentifier.getVersion();
if (!version.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
}
final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), version.getAsInt());
final RecordSchema schema = schemaIdVersionMap.get(tuple);
return schema;
}
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
if (schemaIdentifier.getName().isPresent()) {
return retrieveSchemaByName(schemaIdentifier);
} else {
return retrieveSchemaByIdAndVersion(schemaIdentifier);
}
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return EnumSet.allOf(SchemaField.class);

View File

@ -56,5 +56,11 @@
<artifactId>commons-csv</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.6.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,6 +17,11 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -25,10 +30,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.StreamUtils;
public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields;
private final SchemaRegistry schemaRegistry;
@ -64,7 +65,13 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
}
final int schemaId = bb.getInt();
return schemaRegistry.retrieveSchema(schemaId, 1);
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
.id(Long.valueOf(schemaId))
.version(1)
.build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
}
@Override

View File

@ -17,6 +17,9 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -27,16 +30,13 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
final SchemaIdentifier identifier = schema.getIdentifier();
final long id = identifier.getIdentifier().getAsLong();
final Long id = identifier.getIdentifier().getAsLong();
// This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
// as it is provided at:
@ -45,7 +45,7 @@ public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter {
// representing the schema id.
final ByteBuffer bb = ByteBuffer.allocate(5);
bb.put((byte) 0);
bb.putInt((int) id);
bb.putInt(id.intValue());
out.write(bb.array());
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.InputStream;
@ -34,6 +35,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
private final SchemaRegistry schemaRegistry;
@ -84,7 +86,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
final long schemaId = Long.parseLong(schemaIdentifier);
final int version = Integer.parseInt(schemaVersion);
final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, version);
final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
if (schema == null) {
throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
}

View File

@ -29,7 +29,8 @@ import java.util.Set;
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final int LATEST_PROTOCOL_VERSION = 1;
static final int LATEST_PROTOCOL_VERSION = 1;
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
@Override
public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
@ -40,13 +41,17 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
final Map<String, String> attributes = new HashMap<>(4);
final SchemaIdentifier id = schema.getIdentifier();
final long schemaId = id.getIdentifier().getAsLong();
final int schemaVersion = id.getVersion().getAsInt();
final Long schemaId = id.getIdentifier().getAsLong();
final Integer schemaVersion = id.getVersion().getAsInt();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
if (id.getBranch().isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
}
return attributes;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
@ -66,7 +67,8 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
final long schemaId = bb.getLong();
final int schemaVersion = bb.getInt();
return schemaRegistry.retrieveSchema(schemaId, schemaVersion);
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
}
@Override

View File

@ -17,6 +17,9 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -27,9 +30,6 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final int LATEST_PROTOCOL_VERSION = 1;
@ -37,8 +37,8 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
final SchemaIdentifier identifier = schema.getIdentifier();
final long id = identifier.getIdentifier().getAsLong();
final int version = identifier.getVersion().getAsInt();
final Long id = identifier.getIdentifier().getAsLong();
final Integer version = identifier.getVersion().getAsInt();
// This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// as it is provided at:

View File

@ -19,10 +19,11 @@ package org.apache.nifi.schema.access;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
@ -30,7 +31,9 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
public class SchemaNameAsAttribute implements SchemaAccessWriter {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME);
private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@ -38,17 +41,34 @@ public class SchemaNameAsAttribute implements SchemaAccessWriter {
@Override
public Map<String, String> getAttributes(final RecordSchema schema) {
final Map<String,String> attributes = new HashMap<>(3);
final SchemaIdentifier identifier = schema.getIdentifier();
final Optional<String> nameOption = identifier.getName();
if (nameOption.isPresent()) {
return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, nameOption.get());
attributes.put(SCHEMA_NAME_ATTRIBUTE, nameOption.get());
}
return Collections.emptyMap();
final OptionalInt versionOption = identifier.getVersion();
if (versionOption.isPresent()) {
attributes.put(SCHEMA_VERSION_ATTRIBUTE, String.valueOf(versionOption.getAsInt()));
}
final Optional<String> branchOption = identifier.getBranch();
if (branchOption.isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, branchOption.get());
}
return attributes;
}
@Override
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier schemaId = schema.getIdentifier();
if (schemaId == null) {
throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because Schema Identifier is not known");
}
if (!schemaId.getName().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because the Schema Name is not known");
}

View File

@ -17,9 +17,11 @@
package org.apache.nifi.schema.access;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.InputStream;
import java.util.Collections;
@ -32,10 +34,17 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
private final SchemaRegistry schemaRegistry;
private final PropertyValue schemaNamePropertyValue;
private final PropertyValue schemaBranchNamePropertyValue;
private final PropertyValue schemaVersionPropertyValue;
public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, final PropertyValue schemaNamePropertyValue) {
public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry,
final PropertyValue schemaNamePropertyValue,
final PropertyValue schemaBranchNamePropertyValue,
final PropertyValue schemaVersionPropertyValue) {
this.schemaRegistry = schemaRegistry;
this.schemaNamePropertyValue = schemaNamePropertyValue;
this.schemaBranchNamePropertyValue = schemaBranchNamePropertyValue;
this.schemaVersionPropertyValue = schemaVersionPropertyValue;
schemaFields = new HashSet<>();
schemaFields.add(SchemaField.SCHEMA_NAME);
@ -50,12 +59,33 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
}
try {
final RecordSchema recordSchema = schemaRegistry.retrieveSchema(schemaName);
final String schemaBranchName = schemaBranchNamePropertyValue.evaluateAttributeExpressions(variables).getValue();
final String schemaVersion = schemaVersionPropertyValue.evaluateAttributeExpressions(variables).getValue();
final SchemaIdentifier.Builder identifierBuilder = SchemaIdentifier.builder();
identifierBuilder.name(schemaName);
if (!StringUtils.isBlank(schemaBranchName)) {
identifierBuilder.branch(schemaBranchName);
}
if (!StringUtils.isBlank(schemaVersion)) {
try {
identifierBuilder.version(Integer.valueOf(schemaVersion));
} catch (NumberFormatException nfe) {
throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName
+ "' because a non-numeric version was supplied '" + schemaVersion + "'", nfe);
}
}
final RecordSchema recordSchema = schemaRegistry.retrieveSchema(identifierBuilder.build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find a schema with name '" + schemaName + "' in the configured Schema Registry");
}
return recordSchema;
} catch (final SchemaNotFoundException snf) {
throw snf;
} catch (final Exception e) {
throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + "' from the configured Schema Registry", e);
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Before;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
public class AbstractSchemaAccessStrategyTest {
protected SchemaRegistry schemaRegistry;
protected RecordSchema recordSchema;
@Before
public void setup() {
this.schemaRegistry = Mockito.mock(SchemaRegistry.class);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
.name("person").branch("master").version(1).id(1L).build();
this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.mockito.ArgumentMatcher;
/**
* ArgumentMatcher for SchemaIdentifier.
*/
public class SchemaIdentifierMatcher extends ArgumentMatcher<SchemaIdentifier> {
private final SchemaIdentifier expectedIdentifier;
public SchemaIdentifierMatcher(final SchemaIdentifier expectedIdentifier) {
this.expectedIdentifier = expectedIdentifier;
}
@Override
public boolean matches(final Object argument) {
if (argument == null || !(argument instanceof SchemaIdentifier)) {
return false;
}
final SchemaIdentifier other = (SchemaIdentifier) argument;
return other.equals(expectedIdentifier);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.when;
public class TestConfluentSchemaRegistryStrategy extends AbstractSchemaAccessStrategyTest {
@Test
public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry);
final int schemaId = 123456;
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bytesOut)) {
out.write(0);
out.writeInt(schemaId);
out.flush();
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
// the confluent strategy will read the id from the input stream and use '1' as the version
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.id((long)schemaId)
.version(1)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
assertNotNull(retrievedSchema);
}
}
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaWithInvalidEncoding() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry);
final int schemaId = 123456;
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bytesOut)) {
out.write(1); // write an invalid magic byte
out.writeInt(schemaId);
out.flush();
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
}
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestConfluentSchemaRegistryWriter {
@Test
public void testValidateValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateInvalidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testWriteHeader() throws IOException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
schemaAccessWriter.writeHeader(recordSchema, out);
try (final ByteArrayInputStream bytesIn = new ByteArrayInputStream(out.toByteArray());
final DataInputStream in = new DataInputStream(bytesIn)) {
Assert.assertEquals(0, in.readByte());
Assert.assertEquals((int) schemaIdentifier.getIdentifier().getAsLong(), in.readInt());
}
}
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields, schemaIdentifier);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.when;
public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
@Test
public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final int protocol = 1;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.id(schemaId)
.version(version)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestHortonworksAttributeSchemaReferenceWriter {
@Test
public void testValidateWithValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithInvalidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testGetAttributesWithoutBranch() {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(3, attributes.size());
Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
}
@Test
public void testGetAttributesWithBranch() {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(4, attributes.size());
Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
}
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields, schemaIdentifier);
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.when;
public class TestHortonworksEncodedSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
@Test
public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
final int protocol = 1;
final long schemaId = 123456;
final int version = 2;
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bytesOut)) {
out.write(protocol);
out.writeLong(schemaId);
out.writeInt(version);
out.flush();
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
// the confluent strategy will read the id from the input stream and use '1' as the version
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.id(schemaId)
.version(version)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
assertNotNull(retrievedSchema);
}
}
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
final int protocol = 0; // use an invalid protocol
final long schemaId = 123456;
final int version = 2;
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bytesOut)) {
out.write(protocol);
out.writeLong(schemaId);
out.writeInt(version);
out.flush();
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
}
}
}
}

View File

@ -17,7 +17,10 @@
package org.apache.nifi.schema.access;
import static org.junit.Assert.assertEquals;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -25,10 +28,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestHortonworksEncodedSchemaReferenceWriter {
@ -36,7 +36,7 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
public void testHeader() throws IOException {
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter();
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.of("name", 48L, 2));
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
final byte[] header;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestSchemaNameAsAttribute {
private List<RecordField> fields;
private SchemaAccessWriter schemaAccessWriter;
@Before
public void setup() {
fields = new ArrayList<>();
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
schemaAccessWriter = new SchemaNameAsAttribute();
}
@Test
public void testWriteNameBranchAndVersion() {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
.name("person").branch("master").version(1).id(1L).build();
final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(schema);
Assert.assertEquals(3, attributes.size());
Assert.assertEquals(schemaIdentifier.getName().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE));
Assert.assertEquals(schemaIdentifier.getBranch().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_BRANCH_ATTRIBUTE));
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), attributes.get(SchemaNameAsAttribute.SCHEMA_VERSION_ATTRIBUTE));
}
@Test
public void testWriteOnlyName() {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("person").id(1L).build();
final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(schema);
Assert.assertEquals(1, attributes.size());
Assert.assertEquals(schemaIdentifier.getName().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE));
}
@Test
public void testValidateSchemaWhenValid() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("person").id(1L).build();
final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier);
schemaAccessWriter.validateSchema(schema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateSchemaWhenNoIdentifier() throws SchemaNotFoundException {
final RecordSchema schema = new SimpleRecordSchema(fields, null);
schemaAccessWriter.validateSchema(schema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateSchemaWhenNoName() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(1L).build();
final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier);
schemaAccessWriter.validateSchema(schema);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.when;
public class TestSchemaNamePropertyStrategy extends AbstractSchemaAccessStrategyTest {
@Test
public void testNameOnly() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue(null);
final PropertyValue versionValue = new MockPropertyValue(null);
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.name(nameValue.getValue())
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test
public void testNameAndVersion() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue(null);
final PropertyValue versionValue = new MockPropertyValue("1");
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.name(nameValue.getValue())
.version(versionValue.asInteger())
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test
public void testNameAndBlankVersion() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue(null);
final PropertyValue versionValue = new MockPropertyValue(" ");
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.name(nameValue.getValue())
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testNameAndNonNumericVersion() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue(null);
final PropertyValue versionValue = new MockPropertyValue("XYZ");
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
}
@Test
public void testNameAndBranch() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue("test");
final PropertyValue versionValue = new MockPropertyValue(null);
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.name(nameValue.getValue())
.branch(branchValue.getValue())
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test
public void testNameAndBlankBranch() throws SchemaNotFoundException, IOException {
final PropertyValue nameValue = new MockPropertyValue("person");
final PropertyValue branchValue = new MockPropertyValue(" ");
final PropertyValue versionValue = new MockPropertyValue(null);
final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy(
schemaRegistry, nameValue, branchValue, versionValue);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.name(nameValue.getValue())
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
assertNotNull(retrievedSchema);
}
}

View File

@ -16,31 +16,14 @@
*/
package org.apache.nifi.schemaregistry.services;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
@ -49,13 +32,24 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Tags({"schema", "registry", "avro", "json", "csv"})
@CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema "
+ "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
+ "representation of the actual schema following the syntax and semantics of Avro's Schema format.")
public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
private final Map<String, String> schemaNameToSchemaMap;
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
@ -70,9 +64,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
private List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
public AvroSchemaRegistry() {
this.schemaNameToSchemaMap = new HashMap<>();
}
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
@ -92,7 +83,7 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
try {
// Use a non-strict parser here, a strict parse can be done (if specified) in customValidate().
final Schema avroSchema = new Schema.Parser().setValidate(false).parse(newValue);
final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(descriptor.getName()).build();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
recordSchemas.put(descriptor.getName(), recordSchema);
} catch (final Exception e) {
@ -127,18 +118,7 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
return results;
}
@Override
public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
final String schemaText = schemaNameToSchemaMap.get(schemaName);
if (schemaText == null) {
throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
}
return schemaText;
}
@Override
public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
private RecordSchema retrieveSchemaByName(final String schemaName) throws SchemaNotFoundException {
final RecordSchema recordSchema = recordSchemas.get(schemaName);
if (recordSchema == null) {
throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
@ -147,26 +127,13 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
}
@Override
public RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name.");
}
@Override
public String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name.");
}
@OnDisabled
public void close() throws Exception {
schemaNameToSchemaMap.clear();
}
@OnEnabled
public void enable(final ConfigurationContext configurationContext) throws InitializationException {
this.schemaNameToSchemaMap.putAll(configurationContext.getProperties().entrySet().stream()
.filter(propEntry -> propEntry.getKey().isDynamic())
.collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (schemaName.isPresent()) {
return retrieveSchemaByName(schemaName.get());
} else {
throw new SchemaNotFoundException("This Schema Registry only supports retrieving a schema by name.");
}
}
@Override

View File

@ -31,6 +31,8 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Test;
@ -39,8 +41,7 @@ public class TestAvroSchemaRegistry {
@Test
public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
String schemaName = "fooSchema";
ConfigurationContext configContext = mock(ConfigurationContext.class);
Map<PropertyDescriptor, String> properties = new HashMap<>();
PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
.name(schemaName)
.dynamic(true)
@ -54,20 +55,20 @@ public class TestAvroSchemaRegistry {
.name("barSchema")
.dynamic(false)
.build();
properties.put(fooSchema, fooSchemaText);
properties.put(barSchema, "");
when(configContext.getProperties()).thenReturn(properties);
AvroSchemaRegistry delegate = new AvroSchemaRegistry();
delegate.enable(configContext);
String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
assertEquals(fooSchemaText, locatedSchemaText);
delegate.onPropertyModified(fooSchema, null, fooSchemaText);
delegate.onPropertyModified(barSchema, null, "");
SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
RecordSchema locatedSchema = delegate.retrieveSchema(schemaIdentifier);
assertEquals(fooSchemaText, locatedSchema.getSchemaText().get());
try {
delegate.retrieveSchemaText("barSchema");
delegate.retrieveSchema(SchemaIdentifier.builder().name("barSchema").build());
Assert.fail("Expected a SchemaNotFoundException to be thrown but it was not");
} catch (final SchemaNotFoundException expected) {
}
delegate.close();
}
@Test
@ -109,7 +110,5 @@ public class TestAvroSchemaRegistry {
when(propertyValue.asBoolean()).thenReturn(false);
results = delegate.customValidate(validationContext);
results.forEach(result -> assertTrue(result.isValid()));
delegate.close();
}
}

View File

@ -18,19 +18,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -70,10 +57,24 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@EventDriven
@SideEffectFree
@SupportsBatching
@ -450,7 +451,8 @@ public class ValidateRecord extends AbstractProcessor {
} else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
return schemaRegistry.retrieveSchema(schemaName);
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
} else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
final Parser parser = new Schema.Parser();

View File

@ -28,7 +28,7 @@ limitations under the License.
<artifactId>nifi-hwx-schema-registry-service</artifactId>
<packaging>jar</packaging>
<properties>
<hwx.registry.version>0.3.0</hwx.registry.version>
<hwx.registry.version>0.5.1</hwx.registry.version>
<jackson.version>2.9.1</jackson.version>
</properties>
<dependencies>

View File

@ -16,23 +16,19 @@
*/
package org.apache.nifi.schemaregistry.hortonworks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@ -44,21 +40,28 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.Tuple;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry")
public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<String,String>, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
private volatile long versionInfoCacheNanos;
@ -149,10 +152,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
return schemaRegistryClient;
}
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName, final String branchName)
throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
// Try to fetch the SchemaVersionInfo from the cache.
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(schemaName);
final Tuple<String,String> nameAndBranch = new Tuple<>(schemaName, branchName);
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(nameAndBranch);
// Determine if the timestampedVersionInfo is expired
boolean fetch = false;
@ -169,14 +175,20 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
}
// schema version info was expired or not found in cache. Fetch from schema registry
final SchemaVersionInfo versionInfo = client.getLatestSchemaVersionInfo(schemaName);
final SchemaVersionInfo versionInfo;
if (StringUtils.isBlank(branchName)) {
versionInfo = client.getLatestSchemaVersionInfo(schemaName);
} else {
versionInfo = client.getLatestSchemaVersionInfo(branchName, schemaName);
}
if (versionInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
// Store new version in cache.
final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
schemaVersionByNameCache.put(schemaName, tuple);
schemaVersionByNameCache.put(nameAndBranch, tuple);
return versionInfo;
} catch (final SchemaNotFoundException e) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
@ -217,23 +229,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
}
}
@Override
public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
final SchemaVersionInfo latest = getLatestSchemaVersionInfo(getClient(), schemaName);
return latest.getSchemaText();
}
private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
@Override
public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
final SchemaVersionInfo versionInfo;
final Long schemaId;
final Integer version;
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
final Optional<String> schemaBranchName = schemaIdentifier.getBranch();
final OptionalInt schemaVersion = schemaIdentifier.getVersion();
try {
final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName);
final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName.get());
if (metadataInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
@ -243,61 +255,59 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
versionInfo = getLatestSchemaVersionInfo(client, schemaName);
version = versionInfo.getVersion();
if (version == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
// possible scenarios are name only, name + branch, or name + version
if (schemaVersion.isPresent()) {
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName.get(), schemaVersion.getAsInt());
versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
} else {
versionInfo = getLatestSchemaVersionInfo(client, schemaName.get(), schemaBranchName.orElse(null));
}
if (versionInfo == null || versionInfo.getVersion() == null) {
final String message = createErrorMessage("Could not find schema", schemaName, schemaBranchName, schemaVersion);
throw new org.apache.nifi.schema.access.SchemaNotFoundException(message);
}
} catch (final Exception e) {
handleException("Failed to retrieve schema with name '" + schemaName + "'", e);
final String message = createErrorMessage("Failed to retrieve schema", schemaName, schemaBranchName, schemaVersion);
handleException(message, e);
return null;
}
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version);
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
.id(schemaId)
.name(schemaName.get())
.branch(schemaBranchName.orElse(null))
.version(versionInfo.getVersion())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
});
}
@Override
public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
try {
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
if (info == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
final SchemaMetadata metadata = info.getSchemaMetadata();
final String schemaName = metadata.getName();
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
if (versionInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
return versionInfo.getSchemaText();
} catch (final Exception e) {
handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e);
return null;
}
}
@Override
public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
final String schemaName;
final SchemaVersionInfo versionInfo;
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}
final OptionalInt version = schemaIdentifier.getVersion();
if (!version.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
}
try {
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId.getAsLong());
if (info == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
@ -305,7 +315,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final SchemaMetadata metadata = info.getSchemaMetadata();
schemaName = metadata.getName();
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version.getAsInt());
versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
if (versionInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
@ -317,14 +327,45 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version);
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
.name(schemaName)
.id(schemaId.getAsLong())
.version(version.getAsInt())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
});
}
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
if (schemaIdentifier.getIdentifier().isPresent()) {
return retrieveSchemaByIdAndVersion(schemaIdentifier);
} else {
return retrieveSchemaByName(schemaIdentifier);
}
}
private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) {
final StringBuilder builder = new StringBuilder(baseMessage)
.append(" with name '")
.append(schemaName.orElse("null"))
.append("'");
if (branchName.isPresent()) {
builder.append(" and branch '").append(branchName.get()).append("'");
}
if (version.isPresent()) {
builder.append(" and version '").append(version.getAsInt()).append("'");
}
return builder.toString();
}
// The schema registry client wraps all IOExceptions in RuntimeException. So if an IOException occurs, we don't know
// that it was an IO problem. So we will look through the Exception's cause chain to see if there is an IOException present.
private void handleException(final String message, final Exception e) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {

View File

@ -17,20 +17,16 @@
package org.apache.nifi.schemaregistry.hortonworks;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Constructor;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.MockConfigurationContext;
import org.junit.Before;
import org.junit.Ignore;
@ -39,12 +35,16 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import java.lang.reflect.Constructor;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
public class TestHortonworksSchemaRegistry {
private HortonworksSchemaRegistry registry;
@ -124,7 +124,8 @@ public class TestHortonworksSchemaRegistry {
registry.enable(configurationContext);
for (int i = 0; i < 10000; i++) {
final RecordSchema schema = registry.retrieveSchema("unit-test");
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build();
final RecordSchema schema = registry.retrieveSchema(schemaIdentifier);
assertNotNull(schema);
}
@ -161,7 +162,8 @@ public class TestHortonworksSchemaRegistry {
registry.enable(configurationContext);
for (int i = 0; i < 2; i++) {
final RecordSchema schema = registry.retrieveSchema("unit-test");
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build();
final RecordSchema schema = registry.retrieveSchema(schemaIdentifier);
assertNotNull(schema);
}
@ -170,7 +172,8 @@ public class TestHortonworksSchemaRegistry {
Thread.sleep(2000L);
for (int i = 0; i < 2; i++) {
final RecordSchema schema = registry.retrieveSchema("unit-test");
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build();
final RecordSchema schema = registry.retrieveSchema(schemaIdentifier);
assertNotNull(schema);
}

View File

@ -47,6 +47,8 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODE
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
@ -79,6 +81,8 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
properties.add(SCHEMA_REGISTRY);
properties.add(SCHEMA_NAME);
properties.add(SCHEMA_VERSION);
properties.add(SCHEMA_BRANCH_NAME);
properties.add(SCHEMA_TEXT);
return properties;

View File

@ -16,23 +16,24 @@
*/
package org.apache.nifi.schemaregistry.services;
import java.io.IOException;
import java.util.Set;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.util.Set;
/**
* Represents {@link ControllerService} strategy to expose internal and/or
* integrate with external Schema Registry
*/
public interface SchemaRegistry extends ControllerService {
/**
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
*
* Retrieves and returns the textual representation of the schema based on
* the provided name of the schema available in Schema Registry.
*
@ -41,9 +42,17 @@ public interface SchemaRegistry extends ControllerService {
* @throws IOException if unable to communicate with the backing store
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException;
default String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException {
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
return recordSchema.getSchemaText().get();
}
/**
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
*
* Retrieves the textual representation of the schema with the given ID and version
*
* @param schemaId the unique identifier for the desired schema
@ -53,25 +62,37 @@ public interface SchemaRegistry extends ControllerService {
* @throws IOException if unable to communicate with the backing store
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException;
default String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
return recordSchema.getSchemaText().get();
}
/**
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
*
* Retrieves and returns the RecordSchema based on the provided name of the schema available in Schema Registry. The RecordSchema
* that is returned must have the Schema's name populated in its SchemaIdentifier. I.e., a call to
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getName() getName()}
* will always return an {@link Optional} that is not empty.
* will always return an {@link java.util.Optional} that is not empty.
*
* @return the latest version of the schema with the given name, or <code>null</code> if no schema can be found with the given name.
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException;
default RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException {
return retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
}
/**
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
*
* Retrieves the schema with the given ID and version. The RecordSchema that is returned must have the Schema's identifier and version
* populated in its SchemaIdentifier. I.e., a call to
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getIdentifier() getIdentifier()}
* will always return an {@link Optional} that is not empty, as will a call to
* will always return an {@link java.util.Optional} that is not empty, as will a call to
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getVersion() getVersion()}.
*
* @param schemaId the unique identifier for the desired schema
@ -82,10 +103,23 @@ public interface SchemaRegistry extends ControllerService {
* @throws IOException if unable to communicate with the backing store
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException;
default RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
return retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
}
/**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(String)} and {@link #retrieveSchema(long, int)}
* Retrieves the schema based on the provided descriptor. The descriptor must contain and schemaIdentifier or name, but not both, along
* with a version, and an optional branch name. For implementations that do not support branching, the branch name will be ignored.
*
* @param schemaIdentifier a schema schemaIdentifier
* @return the schema for the given descriptor
* @throws IOException if unable to communicate with the backing store
* @throws SchemaNotFoundException if unable to find the schema based on the given descriptor
*/
RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException;
/**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)}
*/
Set<SchemaField> getSuppliedSchemaFields();
}