NIFI-7221 Initial work

This commit is contained in:
Pierre Villard 2020-03-04 19:25:26 +01:00 committed by Bryan Bende
parent f694e6464f
commit 1fe79021b5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
8 changed files with 259 additions and 86 deletions

View File

@ -22,6 +22,7 @@ public enum SchemaField {
SCHEMA_TEXT_FORMAT("Schema Text Format"),
SCHEMA_NAME("Schema Name"),
SCHEMA_IDENTIFIER("Schema Identifier"),
SCHEMA_VERSION_ID("Schema-Version Identifier"),
SCHEMA_VERSION("Schema Version"),
SCHEMA_BRANCH_NAME("Schema Branch Name");

View File

@ -38,13 +38,23 @@ public interface SchemaIdentifier {
*/
OptionalInt getVersion();
/**
* @return the schema version ID of the schema, if one has been defined.
*/
OptionalLong getSchemaVersionId();
/**
* @return the name of the branch where the schema is located, if one has been defined
*/
Optional<String> getBranch();
/**
* @return the protocol used to get this schema identifier
*/
Integer getProtocol();
SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null);
SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
static Builder builder() {
return new StandardSchemaIdentifier.Builder();
@ -61,8 +71,12 @@ public interface SchemaIdentifier {
Builder version(Integer version);
Builder schemaVersionId(Long schemaVersionId);
Builder branch(String branch);
Builder protocol(Integer protocol);
SchemaIdentifier build();
}

View File

@ -25,15 +25,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private final Optional<String> name;
private final OptionalLong identifier;
private final OptionalInt version;
private final OptionalLong schemaVersionId;
private final Optional<String> branch;
private final int protocol;
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) {
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
final Long schemaVersionId, final String branch, final int protocol) {
this.name = Optional.ofNullable(name);
this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
this.branch = Optional.ofNullable(branch);
this.protocol = protocol;
if (this.name == null && this.identifier == null) {
if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
throw new IllegalStateException("Name or Identifier must be provided");
}
}
@ -53,14 +58,25 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return version;
}
@Override
public OptionalLong getSchemaVersionId() {
return schemaVersionId;
}
@Override
public Optional<String> getBranch() {
return branch;
}
@Override
public Integer getProtocol() {
return protocol;
}
@Override
public int hashCode() {
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode();
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
+ 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
}
@Override
@ -78,9 +94,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return getName().equals(other.getName())
&& getIdentifier().equals(other.getIdentifier())
&& getVersion().equals(other.getVersion())
&& getSchemaVersionId().equals(other.getSchemaVersionId())
&& getBranch().equals(other.getBranch());
}
@Override
public String toString() {
return "[ name = " + name + ", "
+ "identifier = " + identifier + ", "
+ "version = " + version + ", "
+ "schemaVersionId = " + schemaVersionId + ", "
+ "branch = " + branch + ", "
+ "protocol = " + protocol + " ]";
}
/**
* Builder to create instances of SchemaIdentifier.
*/
@ -90,6 +117,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private String branch;
private Long identifier;
private Integer version;
private Long schemaVersionId;
private Integer protocol;
@Override
public SchemaIdentifier.Builder name(final String name) {
@ -115,9 +144,21 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return this;
}
@Override
public SchemaIdentifier.Builder schemaVersionId(final Long schemaVersionId) {
this.schemaVersionId = schemaVersionId;
return this;
}
@Override
public SchemaIdentifier.Builder protocol(final Integer protocol) {
this.protocol = protocol;
return this;
}
@Override
public SchemaIdentifier build() {
return new StandardSchemaIdentifier(name, identifier, version, branch);
return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
}
}
}

View File

@ -17,10 +17,6 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@ -28,16 +24,20 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields;
public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
private final SchemaRegistry schemaRegistry;
static final int LATEST_PROTOCOL_VERSION = 3;
public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
@ -45,6 +45,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
schemaFields = new HashSet<>();
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
schemaFields.add(SchemaField.SCHEMA_VERSION);
schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
}
@ -57,7 +58,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
+ SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
}
@ -68,28 +70,34 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
}
final int protocol = Integer.parseInt(schemaProtocol);
if (protocol != 1) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
if (protocol > LATEST_PROTOCOL_VERSION) {
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
}
if (!isNumber(schemaIdentifier)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Identifier number");
SchemaIdentifier identifier;
if (!isNumber(schemaVersionId)) {
if (!isNumber(schemaIdentifier)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Identifier number");
}
if (!isNumber(schemaVersion)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Version number");
}
final long schemaId = Long.parseLong(schemaIdentifier);
final int version = Integer.parseInt(schemaVersion);
identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
} else {
final long svi = Long.parseLong(schemaVersionId);
identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
}
if (!isNumber(schemaVersion)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Version number");
}
final long schemaId = Long.parseLong(schemaIdentifier);
final int version = Integer.parseInt(schemaVersion);
final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
if (schema == null) {
throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'");
}
return schema;

View File

@ -17,9 +17,6 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
@ -27,9 +24,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
static final int LATEST_PROTOCOL_VERSION = 1;
static final int LATEST_PROTOCOL_VERSION = 3;
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
@Override
@ -46,23 +46,30 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
if (id.getBranch().isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
}
if (id.getSchemaVersionId().isPresent()) {
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
}
return attributes;
}
@Override
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier id = schema.getIdentifier();
if (!id.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier");
}
if (!id.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version");
final SchemaIdentifier identifier = schema.getIdentifier();
if(!identifier.getSchemaVersionId().isPresent()) {
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
}
}
}

View File

@ -17,11 +17,6 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -30,8 +25,13 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
private static final int LATEST_PROTOCOL_VERSION = 1;
private static final int LATEST_PROTOCOL_VERSION = 3;
private final Set<SchemaField> schemaFields;
private final SchemaRegistry schemaRegistry;
@ -47,28 +47,67 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
@Override
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final byte[] buffer = new byte[13];
final byte[] buffer = new byte[1];
try {
StreamUtils.fillBuffer(contentStream, buffer);
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe);
throw new SchemaNotFoundException("Could not read first byte from stream", ioe);
}
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// as it is provided at:
// https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
final ByteBuffer bb = ByteBuffer.wrap(buffer);
final int protocolVersion = bb.get();
if (protocolVersion != 1) {
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
SchemaIdentifier schemaIdentifier;
switch(protocolVersion) {
case 1:
final byte[] bufferv1 = new byte[12];
try {
StreamUtils.fillBuffer(contentStream, bufferv1);
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
final long schemaId = bbv1.getLong();
final int schemaVersion = bbv1.getInt();
schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 2:
final byte[] bufferv2 = new byte[8];
try {
StreamUtils.fillBuffer(contentStream, bufferv2);
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
final long sviLong = bbv2.getLong();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 3:
final byte[] bufferv3 = new byte[4];
try {
StreamUtils.fillBuffer(contentStream, bufferv3);
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
final int sviInt = bbv3.getInt();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
default:
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
}
final long schemaId = bb.getLong();
final int schemaVersion = bb.getInt();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
}
@Override

View File

@ -17,38 +17,57 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final int LATEST_PROTOCOL_VERSION = 1;
private static final int LATEST_PROTOCOL_VERSION = 3;
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
final SchemaIdentifier identifier = schema.getIdentifier();
final Long id = identifier.getIdentifier().getAsLong();
final Integer version = identifier.getVersion().getAsInt();
// This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// as it is provided at:
// https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
final ByteBuffer bb = ByteBuffer.allocate(13);
bb.put((byte) LATEST_PROTOCOL_VERSION);
bb.putLong(id);
bb.putInt(version);
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
switch(identifier.getProtocol()) {
case 1:
final Long id = identifier.getIdentifier().getAsLong();
final Integer version = identifier.getVersion().getAsInt();
final ByteBuffer bbv1 = ByteBuffer.allocate(13);
bbv1.put((byte) 1);
bbv1.putLong(id);
bbv1.putInt(version);
out.write(bbv1.array());
return;
case 2:
final Long sviV2 = identifier.getIdentifier().getAsLong();
final ByteBuffer bbv2 = ByteBuffer.allocate(9);
bbv2.put((byte) 2);
bbv2.putLong(sviV2);
out.write(bbv2.array());
return;
case 3:
final Long sviV3 = identifier.getIdentifier().getAsLong();
final ByteBuffer bbv3 = ByteBuffer.allocate(5);
bbv3.put((byte) 3);
bbv3.putInt(sviV3.intValue());
out.write(bbv3.array());
return;
default:
throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
}
out.write(bb.array());
}
@Override
@ -59,14 +78,14 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
@Override
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
final OptionalLong identifierOption = identifier.getIdentifier();
if (!identifierOption.isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
}
final OptionalInt versionOption = identifier.getVersion();
if (!versionOption.isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
if(!identifier.getSchemaVersionId().isPresent()) {
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.schemaregistry.hortonworks;
import com.google.common.collect.ImmutableMap;
import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
@ -64,8 +65,13 @@ import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry")
public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME,
SchemaField.SCHEMA_BRANCH_NAME,
SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT,
SchemaField.SCHEMA_IDENTIFIER,
SchemaField.SCHEMA_VERSION,
SchemaField.SCHEMA_VERSION_ID);
private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
@ -420,6 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName.get())
.branch(schemaBranchName.orElse(null))
.version(versionInfo.getVersion())
.protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@ -470,6 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName)
.id(schemaId.getAsLong())
.version(version.getAsInt())
.protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@ -481,13 +489,49 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
if (schemaIdentifier.getIdentifier().isPresent()) {
if (schemaIdentifier.getSchemaVersionId().isPresent()) {
return retrieveSchemaBySchemaVersionId(schemaIdentifier);
} else if (schemaIdentifier.getIdentifier().isPresent()) {
return retrieveSchemaByIdAndVersion(schemaIdentifier);
} else {
return retrieveSchemaByName(schemaIdentifier);
}
}
private RecordSchema retrieveSchemaBySchemaVersionId(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
final SchemaRegistryClient client = getClient();
final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId();
final SchemaIdVersion svi = new SchemaIdVersion(schemaVersionId.getAsLong());
final String schemaName;
final SchemaVersionInfo versionInfo;
try {
versionInfo = client.getSchemaVersionInfo(svi);
schemaName = versionInfo.getName();
} catch (final Exception e) {
handleException("Failed to retrieve schema with Schema Version ID '" + schemaVersionId.getAsLong() + "'", e);
return null;
}
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
.name(schemaName)
.id(versionInfo.getSchemaMetadataId())
.version(versionInfo.getVersion())
.schemaVersionId(schemaVersionId.getAsLong())
.protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
});
}
private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) {
final StringBuilder builder = new StringBuilder(baseMessage)
.append(" with name '")