NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8351.
This commit is contained in:
Juldrixx 2024-02-02 17:44:37 +01:00 committed by Pierre Villard
parent c29a744644
commit ecea18f796
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
14 changed files with 137 additions and 230 deletions

View File

@ -26,6 +26,7 @@ import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -43,7 +44,7 @@ import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "apicurio", "avro"}) @Tags({"schema", "registry", "apicurio", "avro"})
@CapabilityDescription("Provides a Schema Registry that interacts with the Apicurio Schema Registry so that those Schemas that are stored in the Apicurio Schema " @CapabilityDescription("Provides a Schema Registry that interacts with the Apicurio Schema Registry so that those Schemas that are stored in the Apicurio Schema "
+ "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their names.") + "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their artifact identifiers.")
public class ApicurioSchemaRegistry extends AbstractControllerService implements SchemaRegistry { public class ApicurioSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
@ -56,6 +57,15 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
.addValidator(StandardValidators.URL_VALIDATOR) .addValidator(StandardValidators.URL_VALIDATOR)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor SCHEMA_GROUP_ID = new PropertyDescriptor.Builder()
.name("Schema Group ID")
.displayName("Schema Group ID")
.description("The artifact Group ID for the schemas")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("default")
.required(true)
.build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Cache Size") .name("Cache Size")
.displayName("Cache Size") .displayName("Cache Size")
@ -86,6 +96,7 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
private static final List<PropertyDescriptor> PROPERTIES = List.of( private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_URL,
SCHEMA_GROUP_ID,
CACHE_SIZE, CACHE_SIZE,
CACHE_EXPIRATION, CACHE_EXPIRATION,
WEB_CLIENT_PROVIDER WEB_CLIENT_PROVIDER
@ -102,29 +113,26 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) { public void onEnabled(final ConfigurationContext context) {
final String schemaRegistryUrl = context.getProperty(SCHEMA_REGISTRY_URL).getValue(); final String schemaRegistryUrl = context.getProperty(SCHEMA_REGISTRY_URL).getValue();
final String schemaGroupId = context.getProperty(SCHEMA_GROUP_ID).getValue();
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final WebClientServiceProvider webClientServiceProvider = final WebClientServiceProvider webClientServiceProvider =
context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class); context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl); final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl, schemaGroupId);
final SchemaRegistryClient schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); final SchemaRegistryClient schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
client = new CachingSchemaRegistryClient(schemaRegistryClient, cacheSize, cacheExpiration); client = new CachingSchemaRegistryClient(schemaRegistryClient, cacheSize, cacheExpiration);
} }
@Override @Override
public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final String schemaName = schemaIdentifier.getName().orElseThrow( final String schemaId = schemaIdentifier.getName().orElseThrow(
() -> new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present") () -> new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present")
); );
final OptionalInt version = schemaIdentifier.getVersion(); final OptionalInt version = schemaIdentifier.getVersion();
if (version.isPresent()) { return client.getSchema(schemaId, version);
return client.getSchema(schemaName, version.getAsInt());
} else {
return client.getSchema(schemaName);
}
} }
@Override @Override

View File

@ -17,13 +17,13 @@
package org.apache.nifi.apicurio.schemaregistry.client; package org.apache.nifi.apicurio.schemaregistry.client;
import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils; import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils;
import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.util.OptionalInt;
public class ApicurioSchemaRegistryClient implements SchemaRegistryClient { public class ApicurioSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryApiClient apiClient; private final SchemaRegistryApiClient apiClient;
@ -33,37 +33,20 @@ public class ApicurioSchemaRegistryClient implements SchemaRegistryClient {
} }
@Override @Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { public RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException {
final ResultAttributes attributes = getAttributesForSchemaName(schemaName); return createRecordSchemaForAttributes(
final int version = getVersionAttributeFromMetadata(attributes); schemaId,
return createRecordSchemaForAttributes(attributes, version); version
);
} }
@Override private RecordSchema createRecordSchemaForAttributes(final String artifactId, final OptionalInt version) throws IOException, SchemaNotFoundException {
public RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException { final URI schemaUri = version.isPresent()
final ResultAttributes attributes = getAttributesForSchemaName(schemaName); ? apiClient.buildSchemaVersionUri(artifactId, version.getAsInt()) :
return createRecordSchemaForAttributes(attributes, version); apiClient.buildSchemaArtifactUri(artifactId);
}
private ResultAttributes getAttributesForSchemaName(String schemaName) throws IOException {
final URI searchUri = apiClient.buildSearchUri(schemaName);
try (final InputStream searchResultStream = apiClient.retrieveResponse(searchUri)) {
return SchemaUtils.getResultAttributes(searchResultStream);
}
}
private int getVersionAttributeFromMetadata(final ResultAttributes attributes) throws IOException {
final URI metaDataUri = apiClient.buildMetaDataUri(attributes.groupId(), attributes.artifactId());
try (final InputStream metadataResultStream = apiClient.retrieveResponse(metaDataUri)) {
return SchemaUtils.extractVersionAttributeFromStream(metadataResultStream);
}
}
private RecordSchema createRecordSchemaForAttributes(ResultAttributes attributes, int version) throws IOException, SchemaNotFoundException {
final URI schemaUri = apiClient.buildSchemaVersionUri(attributes.groupId(), attributes.artifactId(), version);
try (final InputStream schemaResultStream = apiClient.retrieveResponse(schemaUri)) { try (final InputStream schemaResultStream = apiClient.retrieveResponse(schemaUri)) {
return SchemaUtils.createRecordSchema(schemaResultStream, attributes.name(), version); return SchemaUtils.createRecordSchema(schemaResultStream, artifactId, version);
} }
} }
} }

View File

@ -23,10 +23,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.time.Duration; import java.time.Duration;
import java.util.OptionalInt;
public class CachingSchemaRegistryClient implements SchemaRegistryClient { public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client; private final SchemaRegistryClient client;
private final LoadingCache<Pair<String, Integer>, RecordSchema> schemaCache; private final LoadingCache<Pair<String, OptionalInt>, RecordSchema> schemaCache;
public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) { public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) {
this.client = toWrap; this.client = toWrap;
@ -34,24 +35,10 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient {
schemaCache = Caffeine.newBuilder() schemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize) .maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos)) .expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(key -> { .build(key -> client.getSchema(key.getLeft(), key.getRight()));
if (key.getRight() == -1) {
// If the version in the key is -1, fetch the schema by name only.
return client.getSchema(key.getLeft());
} else {
// If a specific version is provided in the key, fetch the schema with that version.
return client.getSchema(key.getLeft(), key.getRight());
} }
});
}
@Override @Override
public RecordSchema getSchema(final String schemaName) { public RecordSchema getSchema(final String schemaName, final OptionalInt version) {
return schemaCache.get(Pair.of(schemaName, -1));
}
@Override
public RecordSchema getSchema(final String schemaName, final int version) {
return schemaCache.get(Pair.of(schemaName, version)); return schemaCache.get(Pair.of(schemaName, version));
} }
} }

View File

@ -26,10 +26,12 @@ public class SchemaRegistryApiClient {
private final WebClientServiceProvider webClientServiceProvider; private final WebClientServiceProvider webClientServiceProvider;
private final String baseUrl; private final String baseUrl;
private final String groupId;
public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl) { public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl, final String groupId) {
this.webClientServiceProvider = webClientServiceProvider; this.webClientServiceProvider = webClientServiceProvider;
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
this.groupId = groupId;
} }
public InputStream retrieveResponse(final URI uri) { public InputStream retrieveResponse(final URI uri) {
@ -51,37 +53,26 @@ public class SchemaRegistryApiClient {
.addPathSegment("v2"); .addPathSegment("v2");
} }
public URI buildSearchUri(final String schemaName) { private HttpUriBuilder buildBaseSchemaUri() {
return buildBaseUri() return buildBaseUri()
.addPathSegment("search") .addPathSegment("groups")
.addPathSegment(this.groupId);
}
private HttpUriBuilder buildBaseSchemaArtifactUri(final String artifactId) {
return buildBaseSchemaUri()
.addPathSegment("artifacts") .addPathSegment("artifacts")
.addQueryParameter("name", schemaName) .addPathSegment(artifactId);
.addQueryParameter("limit", "1")
.build();
} }
public URI buildMetaDataUri(final String groupId, final String artifactId) { public URI buildSchemaArtifactUri(final String artifactId) {
return buildGroupArtifactsUri(groupId, artifactId) return buildBaseSchemaArtifactUri(artifactId).build();
.addPathSegment("meta")
.build();
} }
public URI buildSchemaUri(final String groupId, final String artifactId) { public URI buildSchemaVersionUri(final String artifactId, final int version) {
return buildGroupArtifactsUri(groupId, artifactId).build(); return buildBaseSchemaArtifactUri(artifactId)
}
public URI buildSchemaVersionUri(final String groupId, final String artifactId, final int version) {
return buildGroupArtifactsUri(groupId, artifactId)
.addPathSegment("versions") .addPathSegment("versions")
.addPathSegment(String.valueOf(version)) .addPathSegment(String.valueOf(version))
.build(); .build();
} }
private HttpUriBuilder buildGroupArtifactsUri(final String groupId, final String artifactId) {
return buildBaseUri()
.addPathSegment("groups")
.addPathSegment(groupId)
.addPathSegment("artifacts")
.addPathSegment(artifactId);
}
} }

View File

@ -20,12 +20,10 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException; import java.io.IOException;
import java.util.OptionalInt;
public interface SchemaRegistryClient { public interface SchemaRegistryClient {
RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException; RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException;
RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException;
} }

View File

@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.UncheckedIOException; import java.util.OptionalInt;
public class SchemaUtils { public class SchemaUtils {
@ -39,50 +39,28 @@ public class SchemaUtils {
private SchemaUtils() { private SchemaUtils() {
} }
public static RecordSchema createRecordSchema(final InputStream schemaStream, final String name, final int version) throws SchemaNotFoundException, IOException { public static RecordSchema createRecordSchema(final InputStream schemaStream, final String artifactId, final OptionalInt version) throws SchemaNotFoundException, IOException {
final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream); final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream);
final String schemaText = schemaNode.toString(); final String schemaText = schemaNode.toString();
try { try {
final Schema avroSchema = new Schema.Parser().parse(schemaText); final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder() final SchemaIdentifier.Builder schemaIdBuilder = SchemaIdentifier.builder()
.name(name) .name(artifactId);
.version(version)
.build(); if (version.isPresent()) {
schemaIdBuilder.version(version.getAsInt());
}
final SchemaIdentifier schemaId = schemaIdBuilder.build();
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException e) { } catch (final SchemaParseException e) {
final String errorMessage = String.format("Obtained Schema with name [%s] from Apicurio Schema Registry but the Schema Text " + final String errorMessage = String.format("Obtained Schema with name [%s] from Apicurio Schema Registry but the Schema Text " +
"that was returned is not a valid Avro Schema", name); "that was returned is not a valid Avro Schema", artifactId);
throw new SchemaNotFoundException(errorMessage, e); throw new SchemaNotFoundException(errorMessage, e);
} }
} }
public static int extractVersionAttributeFromStream(InputStream in) {
final JsonNode metadataNode;
try {
metadataNode = OBJECT_MAPPER.readTree(in);
} catch (IOException e) {
throw new UncheckedIOException("Failed to read version from HTTP response stream", e);
}
return Integer.parseInt(metadataNode.get("version").asText());
}
public static ResultAttributes getResultAttributes(InputStream in) {
final JsonNode jsonNode;
try {
jsonNode = OBJECT_MAPPER.readTree(in);
} catch (IOException e) {
throw new UncheckedIOException("Failed to read result attributes from HTTP response stream", e);
}
final JsonNode artifactNode = jsonNode.get("artifacts").get(0);
final String groupId = artifactNode.get("groupId").asText();
final String artifactId = artifactNode.get("id").asText();
final String name = artifactNode.get("name").asText();
return new ResultAttributes(groupId, artifactId, name);
}
public record ResultAttributes(String groupId, String artifactId, String name) {
}
static ObjectMapper setObjectMapper() { static ObjectMapper setObjectMapper() {
return new ObjectMapper(); return new ObjectMapper();

View File

@ -19,7 +19,6 @@ package org.apache.nifi.apicurio.schemaregistry.client;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -29,6 +28,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.OptionalInt;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -39,55 +39,45 @@ import static org.mockito.Mockito.verify;
class ApicurioSchemaRegistryClientTest { class ApicurioSchemaRegistryClientTest {
private static final String TEST_URL = "http://test.apicurio-schema-registry.com:8888"; private static final String TEST_URL = "http://test.apicurio-schema-registry.com:8888";
private static final String SCHEMA_NAME = "schema1";
private static final int VERSION = 3;
private static final String SEARCH_URL = TEST_URL + "/search";
private static final String METADATA_URL = TEST_URL + "/meta";
private static final String SCHEMA_VERSION_URL = TEST_URL + "/schema/versions/" + VERSION;
private static final String GROUP_ID = "groupId1";
private static final String ARTIFACT_ID = "artifactId1"; private static final String ARTIFACT_ID = "artifactId1";
private static final int VERSION = 3;
private static final String SCHEMA_ARTIFACT_URL = TEST_URL + "/schema/" + ARTIFACT_ID;
private static final String SCHEMA_VERSION_URL = TEST_URL + "/schema/versions/" + VERSION;
@Mock @Mock
private SchemaRegistryApiClient apiClient; private SchemaRegistryApiClient apiClient;
private ApicurioSchemaRegistryClient schemaRegistryClient; private ApicurioSchemaRegistryClient schemaRegistryClient;
@BeforeEach
void setup() {
doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME);
doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL));
doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL));
}
@Test @Test
void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { void testGetSchemaWithIdInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); doReturn(URI.create(SCHEMA_ARTIFACT_URL)).when(apiClient).buildSchemaArtifactUri(ARTIFACT_ID);
doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL)); doReturn(getResource("schema_response_version_latest.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_ARTIFACT_URL));
schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME); final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.empty());
verify(apiClient).buildSearchUri(SCHEMA_NAME); verify(apiClient).buildSchemaArtifactUri(ARTIFACT_ID);
verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); verify(apiClient, never()).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset()) final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_latest.json"), Charset.defaultCharset())
.replace("\n", "") .replace("\n", "")
.replaceAll(" +", ""); .replaceAll(" +", "");
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));
} }
@Test @Test
void testGetSchemaWithNameAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { void testGetSchemaWithIdAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
doReturn(getResource("schema_response_version_3.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL));
schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME, 3); final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.of(VERSION));
verify(apiClient).buildSearchUri(SCHEMA_NAME); verify(apiClient, never()).buildSchemaArtifactUri(ARTIFACT_ID);
verify(apiClient, never()).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); verify(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset()) final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_3.json"), Charset.defaultCharset())
.replace("\n", "") .replace("\n", "")
.replaceAll(" +", ""); .replaceAll(" +", "");
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));

View File

@ -29,6 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -60,35 +61,36 @@ class CachingSchemaRegistryClientTest {
@Test @Test
void testGetSchemaWithNameInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException { void testGetSchemaWithNameInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
when(mockClient.getSchema(SCHEMA_NAME)).thenReturn(TEST_SCHEMA); final OptionalInt version = OptionalInt.empty();
RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME); when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA);
RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME);
RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version);
RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version);
assertEquals(TEST_SCHEMA, actualSchema1); assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2); assertEquals(TEST_SCHEMA, actualSchema2);
verify(mockClient).getSchema(SCHEMA_NAME); verify(mockClient).getSchema(SCHEMA_NAME, version);
} }
@Test @Test
void testGetSchemaWithNameAndVersionInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException { void testGetSchemaWithNameAndVersionInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
String schemaName = "schema"; final OptionalInt version = OptionalInt.of(1);
int version = 1;
when(mockClient.getSchema(schemaName, version)).thenReturn(TEST_SCHEMA); when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA);
RecordSchema actualSchema1 = cachingClient.getSchema(schemaName, version); RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version);
RecordSchema actualSchema2 = cachingClient.getSchema(schemaName, version); RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version);
assertEquals(TEST_SCHEMA, actualSchema1); assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2); assertEquals(TEST_SCHEMA, actualSchema2);
verify(mockClient).getSchema(schemaName, version); verify(mockClient).getSchema(SCHEMA_NAME, version);
} }
@Test @Test
void testGetSchemaWithNameAndVersionDoesNotCacheDifferentVersions() throws IOException, SchemaNotFoundException { void testGetSchemaWithNameAndVersionDoesNotCacheDifferentVersions() throws IOException, SchemaNotFoundException {
int version1 = 1; final OptionalInt version1 = OptionalInt.of(1);
int version2 = 2; final OptionalInt version2 = OptionalInt.of(2);
RecordSchema expectedSchema1 = TEST_SCHEMA; RecordSchema expectedSchema1 = TEST_SCHEMA;
RecordSchema expectedSchema2 = TEST_SCHEMA_2; RecordSchema expectedSchema2 = TEST_SCHEMA_2;

View File

@ -35,12 +35,12 @@ class SchemaRegistryApiClientTest {
private static final String BASE_URL = "http://test.apicurio-schema-registry.com:8888"; private static final String BASE_URL = "http://test.apicurio-schema-registry.com:8888";
private static final String API_PATH = "/apis/registry/v2"; private static final String API_PATH = "/apis/registry/v2";
private static final String METADATA_PATH = "/meta";
private static final String GROUP_ID = "groupId1"; private static final String GROUP_ID = "groupId1";
private static final String ARTIFACT_ID = "artifactId1"; private static final String ARTIFACT_ID = "artifactId1";
private static final String SCHEMA_PATH = String.format("/groups/%s/artifacts/%s", GROUP_ID, ARTIFACT_ID); private static final int VERSION = 3;
private static final String SCHEMA_NAME = "schema1"; private static final String GROUP_PATH = String.format("/groups/%s", GROUP_ID);
private static final String SEARCH_PATH = String.format("/search/artifacts?name=%s&limit=1", SCHEMA_NAME); private static final String ARTIFACT_PATH = String.format("/artifacts/%s", ARTIFACT_ID);
private static final String VERSION_PATH = String.format("/versions/%d", VERSION);
@Mock @Mock
private WebClientServiceProvider webClientServiceProvider; private WebClientServiceProvider webClientServiceProvider;
@ -53,7 +53,7 @@ class SchemaRegistryApiClientTest {
@Test @Test
void testBuildBaseUrl() { void testBuildBaseUrl() {
client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
final HttpUriBuilder httpUriBuilder = client.buildBaseUri(); final HttpUriBuilder httpUriBuilder = client.buildBaseUri();
@ -61,30 +61,20 @@ class SchemaRegistryApiClientTest {
} }
@Test @Test
void testBuildSearchUri() { void testBuildSchemaArtifactUri() {
client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
final URI uri = client.buildSearchUri(SCHEMA_NAME); final URI uri = client.buildSchemaArtifactUri(ARTIFACT_ID);
assertEquals(BASE_URL + API_PATH + SEARCH_PATH, uri.toString()); assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH, uri.toString());
} }
@Test @Test
void testBuildMetadataUri() { void testBuildSchemaVersionUri() {
client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
final URI uri = client.buildMetaDataUri(GROUP_ID, ARTIFACT_ID); final URI uri = client.buildSchemaVersionUri(ARTIFACT_ID, VERSION);
assertEquals(BASE_URL + API_PATH + SCHEMA_PATH + METADATA_PATH, uri.toString()); assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH + VERSION_PATH, uri.toString());
} }
@Test
void testBuildSchemaUri() {
client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL);
final URI uri = client.buildSchemaUri(GROUP_ID, ARTIFACT_ID);
assertEquals(BASE_URL + API_PATH + SCHEMA_PATH, uri.toString());
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.apicurio.schemaregistry.util; package org.apache.nifi.apicurio.schemaregistry.util;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -25,6 +24,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.OptionalInt;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -34,7 +34,7 @@ class SchemaUtilsTest {
void testCreateRecordSchema() throws SchemaNotFoundException, IOException { void testCreateRecordSchema() throws SchemaNotFoundException, IOException {
final InputStream in = getResource("schema_response.json"); final InputStream in = getResource("schema_response.json");
final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", 3); final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", OptionalInt.of(3));
assertEquals("schema1", schema.getSchemaName().orElseThrow(() -> new AssertionError("Schema Name is empty"))); assertEquals("schema1", schema.getSchemaName().orElseThrow(() -> new AssertionError("Schema Name is empty")));
assertEquals("schema_namespace_1", schema.getSchemaNamespace().orElseThrow(() -> new AssertionError("Schema Namespace is empty"))); assertEquals("schema_namespace_1", schema.getSchemaNamespace().orElseThrow(() -> new AssertionError("Schema Namespace is empty")));
@ -46,25 +46,6 @@ class SchemaUtilsTest {
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));
} }
@Test
void testGetVersionAttribute() {
final InputStream in = getResource("metadata_response.json");
int version = SchemaUtils.extractVersionAttributeFromStream(in);
assertEquals(3, version);
}
@Test
void testGetResultAttributes() {
final InputStream in = getResource("search_response.json");
final ResultAttributes resultAttributes = SchemaUtils.getResultAttributes(in);
final ResultAttributes expectedAttributes = new ResultAttributes("groupId1", "artifactId1", "schema1");
assertEquals(expectedAttributes, resultAttributes);
}
private InputStream getResource(final String resourceName) { private InputStream getResource(final String resourceName) {
return this.getClass().getClassLoader().getResourceAsStream(resourceName); return this.getClass().getClassLoader().getResourceAsStream(resourceName);
} }

View File

@ -1,15 +0,0 @@
{
"name": "schema1",
"createdBy": "",
"createdOn": "2023-10-16T14:19:21+0000",
"modifiedBy": "",
"modifiedOn": "2023-10-16T14:53:12+0000",
"id": "artifactId1",
"version": "3",
"type": "AVRO",
"globalId": 3,
"state": "ENABLED",
"groupId": "groupId1",
"contentId": 2,
"references": []
}

View File

@ -0,0 +1,15 @@
{
"type": "record",
"namespace": "schema_namespace",
"name": "schema_version_3",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}

View File

@ -0,0 +1,15 @@
{
"type": "record",
"namespace": "schema_namespace",
"name": "schema_version_latest",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}

View File

@ -1,16 +0,0 @@
{
"artifacts": [
{
"id": "artifactId1",
"name": "schema1",
"createdOn": "2023-10-16T14:19:21+0000",
"createdBy": "",
"type": "AVRO",
"state": "ENABLED",
"modifiedOn": "2023-10-16T14:19:31+0000",
"modifiedBy": "",
"groupId": "groupId1"
}
],
"count": 1
}