NIFI-8785 Confluent Schema Registry REST client refactoring

- Added debug logs and a new method to get schema info without making subjects API calls

This closes #5250

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2021-07-22 18:53:13 +02:00 committed by exceptionfactory
parent a466e714a0
commit 5117fc0619
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 107 additions and 35 deletions

View File

@ -122,34 +122,71 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
// Check if we have cached the Identifier to Name mapping
JsonNode completeSchema = null;
// We get the schema definition using the ID of the schema
// GET /schemas/ids/{int: id}
final String schemaPath = getSchemaPath(schemaId);
final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
//Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
final JsonNode schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId);
// Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
// GET /schemas/ids/{int: id}/subjects
JsonNode subjectsJson = null;
try {
subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name");
if(subjectsJson != null) {
final ArrayNode subjectsList = (ArrayNode) subjectsJson;
for (JsonNode subject: subjectsList) {
final String searchName = subject.asText();
try {
// get complete schema (name + id + version) using the subject name API
completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
logger.debug("Could not find schema in registry by subject name " + searchName, e);
continue;
}
}
}
} catch (SchemaNotFoundException e) {
logger.debug("Could not find schema name in registry by id in: + " + schemaPath);
logger.debug("Could not find schema name in registry by id in: " + schemaPath);
}
JsonNode completeSchema = null;
if(subjectsJson == null) {
// Get all couples (subject name, version) for a given schema ID
// GET /schemas/ids/{int: id}/versions
if(completeSchema == null) {
JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name");
if(subjectsVersions != null) {
final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions;
// we want to make sure we get the latest version
int maxVersion = 0;
String subjectName = null;
for (JsonNode subjectVersion: subjectsVersionsList) {
int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt();
String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText();
if(currentVersion > maxVersion) {
maxVersion = currentVersion;
subjectName = currentSubjectName;
}
}
if(subjectName != null) {
return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
}
}
}
// Last resort option: we get the full list of subjects and check one by one to get the complete schema info
if(completeSchema == null) {
final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
for (JsonNode subject: subjectsAllList) {
try {
final String searchName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
}
}
} else {
final ArrayNode subjectsList = (ArrayNode) subjectsJson;
for (JsonNode subject: subjectsList) {
try {
final String searchName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
@ -158,12 +195,23 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
}
if(completeSchema == null) {
throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
throw new SchemaNotFoundException("Could not get schema with id: " + schemaId);
}
return createRecordSchema(completeSchema);
}
private RecordSchema createRecordSchema(final String name, final int version, final int id, final String schema) throws SchemaNotFoundException {
try {
final Schema avroSchema = new Schema.Parser().parse(schema);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(name).id((long) id).version(version).build();
return AvroTypeUtil.createSchema(avroSchema, schema, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
}
}
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
@ -173,7 +221,6 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
@ -196,6 +243,9 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
logger.debug("POST JSON response URL {}", url);
final WebTarget webTarget = client.target(url);
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@ -204,26 +254,40 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final Response response = builder.post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
continue;
}
switch (Response.Status.fromStatusCode(responseCode)) {
case OK:
JsonNode jsonResponse = response.readEntity(JsonNode.class);
if(responseCode == Response.Status.OK.getStatusCode()) {
return response.readEntity(JsonNode.class);
if (logger.isDebugEnabled()) {
logger.debug("JSON Response: {}", jsonResponse);
}
return jsonResponse;
case NOT_FOUND:
logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
continue;
default:
errorMessage = response.readEntity(String.class);
continue;
}
}
throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: "
throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+ " from any of the Confluent Schema Registry URL's provided; failure response message: "
+ errorMessage);
}
private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException {
String errorMessage = null;
for (final String baseUrl : baseUrls) {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
logger.debug("GET JSON response URL {}", url);
final WebTarget webTarget = client.target(url);
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@ -232,20 +296,28 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final Response response = builder.get();
final int responseCode = response.getStatus();
if (responseCode == Response.Status.OK.getStatusCode()) {
return response.readEntity(JsonNode.class);
}
switch (Response.Status.fromStatusCode(responseCode)) {
case OK:
JsonNode jsonResponse = response.readEntity(JsonNode.class);
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl);
}
if (logger.isDebugEnabled()) {
logger.debug("JSON Response {}", jsonResponse);
}
if (errorMessage == null) {
errorMessage = response.readEntity(String.class);
return jsonResponse;
case NOT_FOUND:
logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
continue;
default:
errorMessage = response.readEntity(String.class);
continue;
}
}
throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+ " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
}
private String getTrimmedBase(String baseUrl) {