NIFI-11828 - Confluent Schema Encoding Access Strategy - Schema ID versus Schema Version ID (#7495)

* NIFI-11828 - Confluent Schema Encoding Access Strategy - Schema ID versus Schema Version ID
* removed hard coded version 1
This commit is contained in:
Pierre Villard 2023-07-25 15:42:50 +02:00 committed by GitHub
parent d201119f0d
commit 319c974e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 7 deletions

View File

@ -296,7 +296,7 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
} }
private RecordSchema retrieveSchemaById(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { private RecordSchema retrieveSchemaById(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier(); final OptionalLong schemaId = schemaIdentifier.getSchemaVersionId();
if (!schemaId.isPresent()) { if (!schemaId.isPresent()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present"); throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
} }

View File

@ -54,7 +54,7 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
// This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
// as it is provided at: // as it is provided at:
// http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes // The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes
// representing the schema id. // representing the schema id.
final ByteBuffer bb = ByteBuffer.wrap(buffer); final ByteBuffer bb = ByteBuffer.wrap(buffer);
@ -67,8 +67,7 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
final int schemaId = bb.getInt(); final int schemaId = bb.getInt();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
.id(Long.valueOf(schemaId)) .schemaVersionId(Long.valueOf(schemaId))
.version(1)
.build(); .build();
return schemaRegistry.retrieveSchema(schemaIdentifier); return schemaRegistry.retrieveSchema(schemaIdentifier);

View File

@ -47,10 +47,9 @@ public class TestConfluentSchemaRegistryStrategy extends AbstractSchemaAccessStr
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) { try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
// the confluent strategy will read the id from the input stream and use '1' as the version // the confluent strategy will read the id from the input stream
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.id((long)schemaId) .schemaVersionId((long)schemaId)
.version(1)
.build(); .build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))