diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 14e3e835e3..31d9801c14 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -33,6 +33,7 @@ import org.glassfish.jersey.client.ClientProperties; import javax.net.ssl.SSLContext; import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -41,8 +42,7 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; + /** *

@@ -64,9 +64,8 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { private static final String VERSION_FIELD_NAME = "version"; private static final String ID_FIELD_NAME = "id"; private static final String SCHEMA_TEXT_FIELD_NAME = "schema"; - - private final ConcurrentMap schemaNameToIdentifierMap = new ConcurrentHashMap<>(); - private final ConcurrentMap schemaIdentifierToNameMap = new ConcurrentHashMap<>(); + private static final String CONTENT_TYPE_HEADER = "Content-Type"; + private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) { @@ -100,41 +99,30 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { // To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently. // Check if we have cached the Identifier to Name mapping - final String schemaName = schemaIdentifierToNameMap.get(schemaId); - if (schemaName != null) { - return getSchema(schemaName); - } - final String schemaDescription = "identifier " + schemaId; - final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription); - if (!schemaNameArray.isArray()) { - throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response"); - } + final String schemaPath = getSchemaPath(schemaId); + final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId); + final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array"); + final ArrayNode subjectsList = (ArrayNode) subjectsJson; - final ArrayNode arrayNode = (ArrayNode) schemaNameArray; - for (final JsonNode node : arrayNode) { - final String nodeName = node.asText(); - - final String schemaPath = getSubjectPath(nodeName); - final JsonNode schemaNode; + JsonNode completeSchema = null; + for (JsonNode subject: subjectsList) { try { - schemaNode = fetchJsonResponse(schemaPath, schemaDescription); - } catch (final SchemaNotFoundException | IOException e) { - logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; " - + "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e}); + final String subjectName = subject.asText(); + completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId); + break; + } catch (SchemaNotFoundException e) { continue; } - final int id = schemaNode.get(ID_FIELD_NAME).asInt(); - schemaNameToIdentifierMap.put(nodeName, id); - schemaIdentifierToNameMap.put(id, nodeName); - - if (id == schemaId) { - return createRecordSchema(schemaNode); - } } - throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId); + if(completeSchema == null) { + throw new SchemaNotFoundException("could not get schema with id: " + schemaId); + } + + final RecordSchema recordSchema = createRecordSchema(completeSchema); + return recordSchema; } private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { @@ -159,11 +147,39 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest"; } + private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException { + return "/schemas/ids/" + URLEncoder.encode(String.valueOf(schemaId), "UTF-8"); + } + + private JsonNode postJsonResponse(final String pathSuffix, final JsonNode schema, final String schemaDescription) throws SchemaNotFoundException { + String errorMessage = null; + for(final String baseUrl: baseUrls) { + final String path = getPath(pathSuffix); + final String trimmedBase = getTrimmedBase(baseUrl); + final String url = trimmedBase + path; + final WebTarget builder = client.target(url); + final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString())); + final int responseCode = response.getStatus(); + + if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { + continue; + } + + if(responseCode == Response.Status.OK.getStatusCode()) { + final JsonNode responseJson = response.readEntity(JsonNode.class); + return responseJson; + } + } + + throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + + errorMessage); + } + private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException { String errorMessage = null; for (final String baseUrl : baseUrls) { - final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix; - final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + final String path = getPath(pathSuffix); + final String trimmedBase = getTrimmedBase(baseUrl); final String url = trimmedBase + path; final WebTarget webTarget = client.target(url); @@ -187,4 +203,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); } + private String getTrimmedBase(String baseUrl) { + return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + } + + private String getPath(String pathSuffix) { + return pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix; + } + }