NIFI-6011 support for retrieving named schema versions

This closes #3297

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Alex Savitsky 2019-02-08 10:16:11 -05:00 committed by Mike Thomsen
parent a2bacde62c
commit 060f4fe73f
4 changed files with 39 additions and 20 deletions

View File

@ -185,7 +185,13 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
final RecordSchema schema = client.getSchema(schemaName.get());
final RecordSchema schema;
if (schemaIdentifier.getVersion().isPresent()) {
schema = client.getSchema(schemaName.get(), schemaIdentifier.getVersion().getAsInt());
} else {
schema = client.getSchema(schemaName.get());
}
return schema;
}

View File

@ -19,17 +19,16 @@ package org.apache.nifi.confluent.schemaregistry.client;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.time.Duration;
public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client;
private final LoadingCache<String, RecordSchema> nameCache;
private final LoadingCache<Pair<String, Integer>, RecordSchema> nameVersionCache;
private final LoadingCache<Integer, RecordSchema> idCache;
@ -40,6 +39,10 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient {
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(client::getSchema);
nameVersionCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(key -> client.getSchema(key.getLeft(), key.getRight()));
idCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
@ -47,12 +50,17 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient {
}
@Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
public RecordSchema getSchema(final String schemaName) {
return nameCache.get(schemaName);
}
@Override
public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
public RecordSchema getSchema(String schemaName, int version) {
return nameVersionCache.get(Pair.of(schemaName, version));
}
@Override
public RecordSchema getSchema(final int schemaId) {
return idCache.get(schemaId);
}

View File

@ -82,13 +82,19 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
@Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
final String pathSuffix = getSubjectPath(schemaName);
final String pathSuffix = getSubjectPath(schemaName, null);
final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName);
final RecordSchema recordSchema = createRecordSchema(responseJson);
return recordSchema;
return createRecordSchema(responseJson);
}
@Override
public RecordSchema getSchema(final String schemaName, final int schemaVersion) throws IOException, SchemaNotFoundException {
final String pathSuffix = getSubjectPath(schemaName, schemaVersion);
final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName);
return createRecordSchema(responseJson);
}
@Override
public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
@ -121,8 +127,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
}
final RecordSchema recordSchema = createRecordSchema(completeSchema);
return recordSchema;
return createRecordSchema(completeSchema);
}
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
@ -133,18 +138,18 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
return recordSchema;
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
}
}
private String getSubjectPath(final String schemaName) throws UnsupportedEncodingException {
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest";
private String getSubjectPath(final String schemaName, final Integer schemaVersion) throws UnsupportedEncodingException {
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/" +
(schemaVersion == null ? "latest" : URLEncoder.encode(String.valueOf(schemaVersion), "UTF-8"));
}
private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException {
@ -166,8 +171,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
}
if(responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
return response.readEntity(JsonNode.class);
}
}
@ -187,8 +191,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final int responseCode = response.getStatus();
if (responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
return response.readEntity(JsonNode.class);
}
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {

View File

@ -26,5 +26,7 @@ public interface SchemaRegistryClient {
RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException;
RecordSchema getSchema(String schemaName, int version) throws IOException, SchemaNotFoundException;
RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException;
}