NIFI-9593 - This closes #5679. Missing catch clauses in Confluent Schema Registry client

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Pierre Villard 2022-01-19 17:17:40 +01:00 committed by Joe Witt
parent 0da3994ac8
commit 695c3aabcb
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A

View File

@ -151,46 +151,54 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
}
} catch (SchemaNotFoundException e) {
logger.debug("Could not find schema name in registry by id in: " + schemaPath);
logger.debug("Could not find schema metadata in registry by id and subjects in: " + schemaPath);
}
// Get all couples (subject name, version) for a given schema ID
// GET /schemas/ids/{int: id}/versions
if(completeSchema == null) {
JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name");
try {
JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name");
if(subjectsVersions != null) {
final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions;
// we want to make sure we get the latest version
int maxVersion = 0;
String subjectName = null;
for (JsonNode subjectVersion: subjectsVersionsList) {
int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt();
String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText();
if(currentVersion > maxVersion) {
maxVersion = currentVersion;
subjectName = currentSubjectName;
if(subjectsVersions != null) {
final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions;
// we want to make sure we get the latest version
int maxVersion = 0;
String subjectName = null;
for (JsonNode subjectVersion: subjectsVersionsList) {
int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt();
String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText();
if(currentVersion > maxVersion) {
maxVersion = currentVersion;
subjectName = currentSubjectName;
}
}
if(subjectName != null) {
return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
}
}
if(subjectName != null) {
return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
}
} catch (SchemaNotFoundException e) {
logger.debug("Could not find schema metadata in registry by id and versions in: " + schemaPath);
}
}
// Last resort option: we get the full list of subjects and check one by one to get the complete schema info
if(completeSchema == null) {
final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
for (JsonNode subject: subjectsAllList) {
try {
final String searchName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
try {
final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
for (JsonNode subject: subjectsAllList) {
try {
final String searchName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
}
}
} catch (SchemaNotFoundException e) {
logger.debug("Could not find schema metadata in registry by iterating through subjects");
}
}