diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 318d2d7473..33e74bc186 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -122,34 +122,71 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { // Check if we have cached the Identifier to Name mapping + JsonNode completeSchema = null; + + // We get the schema definition using the ID of the schema + // GET /schemas/ids/{int: id} final String schemaPath = getSchemaPath(schemaId); - final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId); - //Get subject name by id, works only with v5.3.1+ Confluent Schema Registry + final JsonNode schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId); + + // Get subject name by id, works only with v5.3.1+ Confluent Schema Registry + // GET /schemas/ids/{int: id}/subjects JsonNode subjectsJson = null; try { subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name"); + + if(subjectsJson != null) { + final ArrayNode subjectsList = (ArrayNode) subjectsJson; + for (JsonNode subject: subjectsList) { + final String searchName = subject.asText(); + try { + // get complete schema (name + id + version) using the subject name API + completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId); + break; + } catch (SchemaNotFoundException e) { + logger.debug("Could not find schema in registry by subject name " + searchName, e); + continue; + } + } + } + } catch (SchemaNotFoundException e) { - logger.debug("Could not find schema name in registry by id in: + " + schemaPath); + logger.debug("Could not find schema name in registry by id in: " + schemaPath); } - JsonNode completeSchema = null; - if(subjectsJson == null) { + + // Get all couples (subject name, version) for a given schema ID + // GET /schemas/ids/{int: id}/versions + if(completeSchema == null) { + JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name"); + + if(subjectsVersions != null) { + final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions; + // we want to make sure we get the latest version + int maxVersion = 0; + String subjectName = null; + for (JsonNode subjectVersion: subjectsVersionsList) { + int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt(); + String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText(); + if(currentVersion > maxVersion) { + maxVersion = currentVersion; + subjectName = currentSubjectName; + } + } + + if(subjectName != null) { + return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText()); + } + } + } + + // Last resort option: we get the full list of subjects and check one by one to get the complete schema info + if(completeSchema == null) { final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array"); final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson; for (JsonNode subject: subjectsAllList) { try { final String searchName = subject.asText(); - completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId); - break; - } catch (SchemaNotFoundException e) { - continue; - } - } - } else { - final ArrayNode subjectsList = (ArrayNode) subjectsJson; - for (JsonNode subject: subjectsList) { - try { - final String searchName = subject.asText(); - completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId); + completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId); break; } catch (SchemaNotFoundException e) { continue; @@ -158,12 +195,23 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { } if(completeSchema == null) { - throw new SchemaNotFoundException("could not get schema with id: " + schemaId); + throw new SchemaNotFoundException("Could not get schema with id: " + schemaId); } return createRecordSchema(completeSchema); } + private RecordSchema createRecordSchema(final String name, final int version, final int id, final String schema) throws SchemaNotFoundException { + try { + final Schema avroSchema = new Schema.Parser().parse(schema); + final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(name).id((long) id).version(version).build(); + return AvroTypeUtil.createSchema(avroSchema, schema, schemaId); + } catch (final SchemaParseException spe) { + throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name + + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema"); + } + } + private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText(); final int version = schemaNode.get(VERSION_FIELD_NAME).asInt(); @@ -173,7 +221,6 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { try { final Schema avroSchema = new Schema.Parser().parse(schemaText); final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build(); - return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); } catch (final SchemaParseException spe) { throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject @@ -196,6 +243,9 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final String path = getPath(pathSuffix); final String trimmedBase = getTrimmedBase(baseUrl); final String url = trimmedBase + path; + + logger.debug("POST JSON response URL {}", url); + final WebTarget webTarget = client.target(url); Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE); for (Map.Entry header : httpHeaders.entrySet()) { @@ -204,26 +254,40 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final Response response = builder.post(Entity.json(schema.toString())); final int responseCode = response.getStatus(); - if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { - continue; - } + switch (Response.Status.fromStatusCode(responseCode)) { + case OK: + JsonNode jsonResponse = response.readEntity(JsonNode.class); - if(responseCode == Response.Status.OK.getStatusCode()) { - return response.readEntity(JsonNode.class); + if (logger.isDebugEnabled()) { + logger.debug("JSON Response: {}", jsonResponse); + } + + return jsonResponse; + + case NOT_FOUND: + logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl); + continue; + + default: + errorMessage = response.readEntity(String.class); + continue; } } - throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); } - private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException { + private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException { String errorMessage = null; for (final String baseUrl : baseUrls) { final String path = getPath(pathSuffix); final String trimmedBase = getTrimmedBase(baseUrl); final String url = trimmedBase + path; + logger.debug("GET JSON response URL {}", url); + final WebTarget webTarget = client.target(url); Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON); for (Map.Entry header : httpHeaders.entrySet()) { @@ -232,20 +296,28 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final Response response = builder.get(); final int responseCode = response.getStatus(); - if (responseCode == Response.Status.OK.getStatusCode()) { - return response.readEntity(JsonNode.class); - } + switch (Response.Status.fromStatusCode(responseCode)) { + case OK: + JsonNode jsonResponse = response.readEntity(JsonNode.class); - if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { - throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl); - } + if (logger.isDebugEnabled()) { + logger.debug("JSON Response {}", jsonResponse); + } - if (errorMessage == null) { - errorMessage = response.readEntity(String.class); + return jsonResponse; + + case NOT_FOUND: + logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl); + continue; + + default: + errorMessage = response.readEntity(String.class); + continue; } } - throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); + throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); } private String getTrimmedBase(String baseUrl) {