NIFI-11827 Added AWS Glue Schema Registry Service

This closes #7492

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel 2023-07-14 16:07:57 +02:00 committed by exceptionfactory
parent 8f5392dd11
commit 8a5f7f0092
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
11 changed files with 808 additions and 2 deletions

View File

@ -51,6 +51,11 @@
<artifactId>nifi-aws-parameter-value-providers</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-schema-registry-service</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -51,6 +51,11 @@ The following binary components are provided under the Apache Software License v
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Apache Commons BeanUtils
The following NOTICE information applies:
Apache Commons BeanUtils
@ -59,6 +64,45 @@ The following binary components are provided under the Apache Software License v
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Commons Compress
The following NOTICE information applies:
Apache Commons Compress
Copyright 2002-2017 The Apache Software Foundation
The files in the package org.apache.commons.compress.archivers.sevenz
were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
which has been placed in the public domain:
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
(ASLv2) Apache Commons CSV
The following NOTICE information applies:
Apache Commons CSV
Copyright 2005-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Commons Text
The following NOTICE information applies:
Apache Commons Text
Copyright 2001-2018 The Apache Software Foundation
(ASLv2) Apache HttpClient
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2022 The Apache Software Foundation
(ASLv2) Apache HttpCore
The following NOTICE information applies:
Apache HttpCore
Copyright 2005-2022 The Apache Software Foundation
(ASLv2) Amazon Ion Java
The following NOTICE information applies:
Amazon Ion Java
Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
(ASLv2) Amazon Web Services SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
@ -96,7 +140,6 @@ The following binary components are provided under the Apache Software License v
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
(ASLv2) This includes derived works from apigateway-generic-java-sdk project (https://github.com/rpgreen/apigateway-generic-java-sdk)
https://github.com/rpgreen/apigateway-generic-java-sdk/commit/32eea44cc855a530c9b4a28b9f3601a41bc85618 as the point reference:
The derived work is adapted from
@ -124,4 +167,38 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 - https://github.com/ben-manes/caffeine)
The following NOTICE information applies:
Caffeine (caching library)
Copyright Ben Manes
Copyright Ben Manes
(ASLv2) Error Prone Annotations
The following NOTICE information applies:
Error Prone Annotations
Copyright 2015 The Error Prone Authors
************************
Eclipse Distribution License 1.0
************************
The following binary components are provided under the Eclipse Distribution License 1.0.
(EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.2)
(EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
************************
The MIT License
************************
The following binary components are provided under the MIT License. See project link for details.
(MIT License) Checker Qual
The following NOTICE information applies:
Copyright (c) Copyright 2004-present by the Checker Framework developers
All rights reserved.
https://www.checkerframework.org/
*****************
Public Domain
*****************
The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
(CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - https://github.com/reactive-streams/reactive-streams-jvm)

View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-schema-registry-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.aws.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.aws.schemaregistry.client.GlueSchemaRegistryClient;
import org.apache.nifi.aws.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.ssl.SSLContextService;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.FileStoreTlsKeyManagersProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.TlsKeyManagersProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.net.Proxy;
import java.net.URI;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "aws", "avro", "glue"})
@CapabilityDescription("Provides a Schema Registry that interacts with the AWS Glue Schema Registry so that those Schemas that are stored in the Glue Schema "
+ "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Glue Schema Registry with their names.")
public class AmazonGlueSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
static final PropertyDescriptor SCHEMA_REGISTRY_NAME = new PropertyDescriptor.Builder()
.name("schema-registry-name")
.displayName("Schema Registry Name")
.description("The name of the Schema Registry")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("region")
.displayName("Region")
.description("The region of the cloud resources")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Region.US_WEST_2).getValue())
.build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached from the Schema Registry")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
.name("cache-expiration")
.displayName("Cache Expiration")
.description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+ "cached version of a schema will no longer be used, and the service will have to communicate with the "
+ "Schema Registry again in order to obtain the schema.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 hour")
.required(true)
.build();
static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("communications-timeout")
.displayName("Communications Timeout")
.description("Specifies how long to wait to receive data from the Schema Registry before considering the communications a failure")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("30 secs")
.required(true)
.build();
public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("aws-credentials-provider-service")
.displayName("AWS Credentials Provider Service")
.description("The Controller Service that is used to obtain AWS credentials provider")
.required(false)
.identifiesControllerService(AWSCredentialsProviderService.class)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
private static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private static final List<PropertyDescriptor> PROPERTIES = new ArrayList<>(Arrays.asList(
SCHEMA_REGISTRY_NAME,
REGION,
COMMUNICATIONS_TIMEOUT,
CACHE_SIZE,
CACHE_EXPIRATION,
AWS_CREDENTIALS_PROVIDER_SERVICE,
PROXY_CONFIGURATION_SERVICE,
SSL_CONTEXT_SERVICE
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private volatile SchemaRegistryClient client;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final AWSCredentialsProviderService awsCredentialsProviderService = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
.asControllerService(AWSCredentialsProviderService.class);
final AwsCredentialsProvider credentialsProvider = awsCredentialsProviderService.getAwsCredentialsProvider();
final String schemaRegistryName = context.getProperty(SCHEMA_REGISTRY_NAME).getValue();
final String region = context.getProperty(REGION).getValue();
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final GlueClientBuilder glueClientBuilder = GlueClient
.builder()
.credentialsProvider(credentialsProvider)
.httpClient(createSdkHttpClient(context))
.region(Region.of(region));
final GlueSchemaRegistryClient glueSchemaRegistryClient = new GlueSchemaRegistryClient(glueClientBuilder.build(), schemaRegistryName);
client = new CachingSchemaRegistryClient(glueSchemaRegistryClient, cacheSize, cacheExpiration);
}
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
final String schemaName = schemaIdentifier.getName().orElseThrow(
() -> new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present")
);
final OptionalInt version = schemaIdentifier.getVersion();
if (version.isPresent()) {
return client.getSchema(schemaName, version.getAsInt());
} else {
return client.getSchema(schemaName);
}
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
private static AllowableValue createAllowableValue(final Region region) {
final String description = region.metadata() != null ? region.metadata().description() : region.id();
return new AllowableValue(region.id(), description, "AWS Region Code : " + region.id());
}
private static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Region region : Region.regions()) {
values.add(createAllowableValue(region));
}
values.sort(Comparator.comparing(AllowableValue::getDisplayName));
return values.toArray(new AllowableValue[0]);
}
private SdkHttpClient createSdkHttpClient(final ConfigurationContext context) {
final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
final int communicationsTimeout = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final TrustManager[] trustManagers = new TrustManager[]{sslContextService.createTrustManager()};
final TlsKeyManagersProvider keyManagersProvider = FileStoreTlsKeyManagersProvider
.create(Paths.get(sslContextService.getKeyStoreFile()), sslContextService.getKeyStoreType(), sslContextService.getKeyStorePassword());
builder.tlsTrustManagersProvider(() -> trustManagers);
builder.tlsKeyManagersProvider(keyManagersProvider);
}
}
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
if (context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet()) {
final ProxyConfigurationService configurationService = context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
return configurationService.getConfiguration();
}
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
final software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyConfigBuilder = software.amazon.awssdk.http.apache.ProxyConfiguration.builder()
.endpoint(URI.create(String.format("%s:%s", proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort())));
if (proxyConfig.hasCredential()) {
proxyConfigBuilder.username(proxyConfig.getProxyUserName());
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
}
builder.proxyConfiguration(proxyConfigBuilder.build());
}
return builder.build();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry.client;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema;
import java.time.Duration;
public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client;
private final LoadingCache<String, RecordSchema> nameCache;
private final LoadingCache<Pair<String, Long>, RecordSchema> nameVersionCache;
public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) {
this.client = toWrap;
nameCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(client::getSchema);
nameVersionCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(key -> client.getSchema(key.getLeft(), key.getRight()));
}
@Override
public RecordSchema getSchema(final String schemaName) {
return nameCache.get(schemaName);
}
@Override
public RecordSchema getSchema(String schemaName, long version) {
return nameVersionCache.get(Pair.of(schemaName, version));
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.SchemaId;
import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
import java.io.IOException;
public class GlueSchemaRegistryClient implements SchemaRegistryClient {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String NAMESPACE_FIELD_NAME = "namespace";
private final GlueClient client;
private final String registryName;
public GlueSchemaRegistryClient(final GlueClient client, final String registryName) {
this.client = client;
this.registryName = registryName;
}
@Override
public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
final SchemaVersionNumber schemaVersionNumber = SchemaVersionNumber.builder()
.latestVersion(true)
.build();
final GetSchemaVersionResponse schemaVersionResponse = getSchemaVersionResponse(schemaName, schemaVersionNumber);
return createRecordSchema(schemaVersionResponse);
}
@Override
public RecordSchema getSchema(final String schemaName, final long version) throws IOException, SchemaNotFoundException {
final SchemaVersionNumber schemaVersionNumber = SchemaVersionNumber.builder()
.versionNumber(version)
.build();
final GetSchemaVersionResponse schemaVersionResponse = getSchemaVersionResponse(schemaName, schemaVersionNumber);
return createRecordSchema(schemaVersionResponse);
}
private GetSchemaVersionResponse getSchemaVersionResponse(final String schemaName, final SchemaVersionNumber schemaVersionNumber) {
final SchemaId schemaId = buildSchemaId(schemaName);
final GetSchemaVersionRequest request = buildSchemaVersionRequest(schemaVersionNumber, schemaId);
return client.getSchemaVersion(request);
}
private GetSchemaVersionRequest buildSchemaVersionRequest(final SchemaVersionNumber schemaVersionNumber, final SchemaId schemaId) {
return GetSchemaVersionRequest.builder()
.schemaVersionNumber(schemaVersionNumber)
.schemaId(schemaId)
.build();
}
private SchemaId buildSchemaId(final String schemaName) {
return SchemaId.builder()
.registryName(registryName)
.schemaName(schemaName)
.build();
}
private RecordSchema createRecordSchema(final GetSchemaVersionResponse schemaVersionResponse) throws SchemaNotFoundException, JsonProcessingException {
final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaVersionResponse.schemaDefinition());
final String namespace = schemaNode.get(NAMESPACE_FIELD_NAME).asText();
final int version = schemaVersionResponse.versionNumber().intValue();
final String schemaText = schemaVersionResponse.schemaDefinition();
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder()
.name(namespace)
.version(version)
.build();
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with name " + namespace
+ " from Glue Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry.client;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
public interface SchemaRegistryClient {
RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException;
RecordSchema getSchema(final String schemaName, final long version) throws IOException, SchemaNotFoundException;
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.aws.schemaregistry.AmazonGlueSchemaRegistry

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry.client;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CachingSchemaRegistryClientTest {
private static final String SCHEMA_NAME = "schema";
private static final RecordSchema TEST_SCHEMA = new SimpleRecordSchema(Arrays.asList(
new RecordField("fieldName1", RecordFieldType.INT.getDataType()),
new RecordField("fieldName2", RecordFieldType.STRING.getDataType())
));
private static final RecordSchema TEST_SCHEMA_2 = new SimpleRecordSchema(Arrays.asList(
new RecordField("fieldName3", RecordFieldType.INT.getDataType()),
new RecordField("fieldName4", RecordFieldType.STRING.getDataType())
));
@Mock
private SchemaRegistryClient mockClient;
private CachingSchemaRegistryClient cachingClient;
@BeforeEach
public void setUp() {
cachingClient = new CachingSchemaRegistryClient(mockClient, 100, TimeUnit.SECONDS.toNanos(1));
}
@Test
void testGetSchemaWithNameInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
when(mockClient.getSchema(SCHEMA_NAME)).thenReturn(TEST_SCHEMA);
RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME);
RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME);
assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2);
verify(mockClient).getSchema(SCHEMA_NAME);
}
@Test
void testGetSchemaWithNameAndVersionInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
String schemaName = "schema";
int version = 1;
when(mockClient.getSchema(schemaName, version)).thenReturn(TEST_SCHEMA);
RecordSchema actualSchema1 = cachingClient.getSchema(schemaName, version);
RecordSchema actualSchema2 = cachingClient.getSchema(schemaName, version);
assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2);
verify(mockClient).getSchema(schemaName, version);
}
@Test
void testGetSchemaWithNameAndVersionDoesNotCacheDifferentVersions() throws IOException, SchemaNotFoundException {
int version1 = 1;
int version2 = 2;
RecordSchema expectedSchema1 = TEST_SCHEMA;
RecordSchema expectedSchema2 = TEST_SCHEMA_2;
when(mockClient.getSchema(SCHEMA_NAME, version1)).thenReturn(expectedSchema1);
when(mockClient.getSchema(SCHEMA_NAME, version2)).thenReturn(expectedSchema2);
RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version1);
RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version2);
assertEquals(expectedSchema1, actualSchema1);
assertEquals(expectedSchema2, actualSchema2);
verify(mockClient).getSchema(SCHEMA_NAME, version1);
verify(mockClient).getSchema(SCHEMA_NAME, version2);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.aws.schemaregistry.client;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class GlueSchemaRegistryClientTest {
private static final String SCHEMA_NAME = "schema";
private static final String REGISTRY_NAME = "registry";
private static final String SCHEMA_DEFINITION = "{\"namespace\":\"com.example\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
private static final GetSchemaVersionResponse MOCK_RESPONSE = GetSchemaVersionResponse.builder()
.schemaDefinition(SCHEMA_DEFINITION)
.versionNumber(1L)
.build();
private static final String EXPECTED_SCHEMA_NAMESPACE = "com.example";
private static final String EXPECTED_SCHEMA_NAME = "User";
@Mock
private GlueClient mockClient;
private GlueSchemaRegistryClient schemaRegistryClient;
@Test
void testGetSchemaWithNameInvokesClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
schemaRegistryClient = new GlueSchemaRegistryClient(mockClient, REGISTRY_NAME);
when(mockClient.getSchemaVersion(any(GetSchemaVersionRequest.class))).thenReturn(MOCK_RESPONSE);
final RecordSchema actualSchema = schemaRegistryClient.getSchema(SCHEMA_NAME);
assertNotNull(actualSchema);
assertEquals(EXPECTED_SCHEMA_NAMESPACE, actualSchema.getSchemaNamespace().orElseThrow(() -> new RuntimeException("Schema namespace not found")));
assertEquals(EXPECTED_SCHEMA_NAME, actualSchema.getSchemaName().orElseThrow(() -> new RuntimeException("Schema name not found")));
verify(mockClient).getSchemaVersion(any(GetSchemaVersionRequest.class));
}
@Test
void getSchemaWithNameAndVersionInvokesClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
int version = 1;
final GetSchemaVersionResponse mockResponse = GetSchemaVersionResponse.builder()
.schemaDefinition(SCHEMA_DEFINITION)
.versionNumber(1L)
.build();
schemaRegistryClient = new GlueSchemaRegistryClient(mockClient, REGISTRY_NAME);
when(mockClient.getSchemaVersion(any(GetSchemaVersionRequest.class))).thenReturn(mockResponse);
final RecordSchema actualSchema = schemaRegistryClient.getSchema(SCHEMA_NAME, version);
assertNotNull(actualSchema);
assertEquals(EXPECTED_SCHEMA_NAMESPACE, actualSchema.getSchemaNamespace().orElseThrow(() -> new RuntimeException("Schema namespace not found")));
assertEquals(EXPECTED_SCHEMA_NAME, actualSchema.getSchemaName().orElseThrow(() -> new RuntimeException("Schema name not found")));
verify(mockClient).getSchemaVersion(any(GetSchemaVersionRequest.class));
}
}

View File

@ -38,6 +38,7 @@
<module>nifi-aws-abstract-processors</module>
<module>nifi-aws-parameter-value-providers</module>
<module>nifi-aws-parameter-providers</module>
<module>nifi-aws-schema-registry-service</module>
</modules>
<dependencyManagement>