NIFI-4777: get schema by id even if not latest

This closes #2405.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Internet 2018-01-15 14:34:41 +02:00 committed by Mark Payne
parent 4687e0fb35
commit 1091093133
1 changed files with 58 additions and 34 deletions

View File

@ -33,6 +33,7 @@ import org.glassfish.jersey.client.ClientProperties;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -41,8 +42,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* <p>
@ -64,9 +64,8 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private static final String VERSION_FIELD_NAME = "version";
private static final String ID_FIELD_NAME = "id";
private static final String SCHEMA_TEXT_FIELD_NAME = "schema";
private final ConcurrentMap<String, Integer> schemaNameToIdentifierMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap = new ConcurrentHashMap<>();
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json";
public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) {
@ -100,41 +99,30 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
// To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently.
// Check if we have cached the Identifier to Name mapping
final String schemaName = schemaIdentifierToNameMap.get(schemaId);
if (schemaName != null) {
return getSchema(schemaName);
}
final String schemaDescription = "identifier " + schemaId;
final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription);
if (!schemaNameArray.isArray()) {
throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response");
}
final String schemaPath = getSchemaPath(schemaId);
final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsList = (ArrayNode) subjectsJson;
final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
for (final JsonNode node : arrayNode) {
final String nodeName = node.asText();
final String schemaPath = getSubjectPath(nodeName);
final JsonNode schemaNode;
JsonNode completeSchema = null;
for (JsonNode subject: subjectsList) {
try {
schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
} catch (final SchemaNotFoundException | IOException e) {
logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; "
+ "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e});
final String subjectName = subject.asText();
completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
}
final int id = schemaNode.get(ID_FIELD_NAME).asInt();
schemaNameToIdentifierMap.put(nodeName, id);
schemaIdentifierToNameMap.put(id, nodeName);
if (id == schemaId) {
return createRecordSchema(schemaNode);
}
}
throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId);
if(completeSchema == null) {
throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
}
final RecordSchema recordSchema = createRecordSchema(completeSchema);
return recordSchema;
}
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
@ -159,11 +147,39 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest";
}
private String getSchemaPath(final int schemaId) throws UnsupportedEncodingException {
return "/schemas/ids/" + URLEncoder.encode(String.valueOf(schemaId), "UTF-8");
}
private JsonNode postJsonResponse(final String pathSuffix, final JsonNode schema, final String schemaDescription) throws SchemaNotFoundException {
String errorMessage = null;
for(final String baseUrl: baseUrls) {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
final WebTarget builder = client.target(url);
final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
continue;
}
if(responseCode == Response.Status.OK.getStatusCode()) {
final JsonNode responseJson = response.readEntity(JsonNode.class);
return responseJson;
}
}
throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: "
+ errorMessage);
}
private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
String errorMessage = null;
for (final String baseUrl : baseUrls) {
final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
final WebTarget webTarget = client.target(url);
@ -187,4 +203,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
}
private String getTrimmedBase(String baseUrl) {
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
}
private String getPath(String pathSuffix) {
return pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
}
}