From 060f4fe73f087b9de103905ee3d8dd1d1b2ee64c Mon Sep 17 00:00:00 2001 From: Alex Savitsky Date: Fri, 8 Feb 2019 10:16:11 -0500 Subject: [PATCH] NIFI-6011 support for retrieving named schema versions This closes #3297 Signed-off-by: Mike Thomsen --- .../ConfluentSchemaRegistry.java | 8 ++++- .../client/CachingSchemaRegistryClient.java | 18 ++++++++--- .../client/RestSchemaRegistryClient.java | 31 ++++++++++--------- .../client/SchemaRegistryClient.java | 2 ++ 4 files changed, 39 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java index 2bf3a612fd..3567392d77 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -185,7 +185,13 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present"); } - final RecordSchema schema = client.getSchema(schemaName.get()); + final RecordSchema schema; + if (schemaIdentifier.getVersion().isPresent()) { + schema = client.getSchema(schemaName.get(), schemaIdentifier.getVersion().getAsInt()); + } else { + schema = client.getSchema(schemaName.get()); + } + return schema; } diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java index 9075ac2eee..365d5009e5 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java @@ -19,17 +19,16 @@ package org.apache.nifi.confluent.schemaregistry.client; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.serialization.record.RecordSchema; -import java.io.IOException; import java.time.Duration; - public class CachingSchemaRegistryClient implements SchemaRegistryClient { private final SchemaRegistryClient client; private final LoadingCache nameCache; + private final LoadingCache, RecordSchema> nameVersionCache; private final LoadingCache idCache; @@ -40,6 +39,10 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient { .maximumSize(cacheSize) .expireAfterWrite(Duration.ofNanos(expirationNanos)) .build(client::getSchema); + nameVersionCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfterWrite(Duration.ofNanos(expirationNanos)) + .build(key -> client.getSchema(key.getLeft(), key.getRight())); idCache = Caffeine.newBuilder() .maximumSize(cacheSize) .expireAfterWrite(Duration.ofNanos(expirationNanos)) @@ -47,12 +50,17 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient { } @Override - public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { + public RecordSchema getSchema(final String schemaName) { return nameCache.get(schemaName); } @Override - public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException { + public RecordSchema getSchema(String schemaName, int version) { + return nameVersionCache.get(Pair.of(schemaName, version)); + } + + @Override + public RecordSchema getSchema(final int schemaId) { return idCache.get(schemaId); } diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 31d9801c14..1bca29cafc 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -82,13 +82,19 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { @Override public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { - final String pathSuffix = getSubjectPath(schemaName); + final String pathSuffix = getSubjectPath(schemaName, null); final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName); - final RecordSchema recordSchema = createRecordSchema(responseJson); - return recordSchema; + return createRecordSchema(responseJson); } + @Override + public RecordSchema getSchema(final String schemaName, final int schemaVersion) throws IOException, SchemaNotFoundException { + final String pathSuffix = getSubjectPath(schemaName, schemaVersion); + final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName); + + return createRecordSchema(responseJson); + } @Override public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException { @@ -121,8 +127,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { throw new SchemaNotFoundException("could not get schema with id: " + schemaId); } - final RecordSchema recordSchema = createRecordSchema(completeSchema); - return recordSchema; + return createRecordSchema(completeSchema); } private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { @@ -133,18 +138,18 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { try { final Schema avroSchema = new Schema.Parser().parse(schemaText); - final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build(); + final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build(); - final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); - return recordSchema; + return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); } catch (final SchemaParseException spe) { throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema"); } } - private String getSubjectPath(final String schemaName) throws UnsupportedEncodingException { - return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest"; + private String getSubjectPath(final String schemaName, final Integer schemaVersion) throws UnsupportedEncodingException { + return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/" + + (schemaVersion == null ? "latest" : URLEncoder.encode(String.valueOf(schemaVersion), "UTF-8")); } private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException { @@ -166,8 +171,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { } if(responseCode == Response.Status.OK.getStatusCode()) { - final JsonNode responseJson = response.readEntity(JsonNode.class); - return responseJson; + return response.readEntity(JsonNode.class); } } @@ -187,8 +191,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final int responseCode = response.getStatus(); if (responseCode == Response.Status.OK.getStatusCode()) { - final JsonNode responseJson = response.readEntity(JsonNode.class); - return responseJson; + return response.readEntity(JsonNode.class); } if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java index 3c8c0cb5aa..c12c2587b5 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java @@ -26,5 +26,7 @@ public interface SchemaRegistryClient { RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException; + RecordSchema getSchema(String schemaName, int version) throws IOException, SchemaNotFoundException; + RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException; }