diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
deleted file mode 100644
index 3471975778..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
+++ /dev/null
@@ -1,1510 +0,0 @@
- * Copyright 2016-2019 Cloudera, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.hortonworks.registries.schemaregistry.client;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.hortonworks.registries.auth.KerberosLogin;
-import com.hortonworks.registries.auth.Login;
-import com.hortonworks.registries.auth.NOOPLogin;
-import com.hortonworks.registries.auth.util.JaasConfiguration;
-import com.hortonworks.registries.common.SchemaRegistryServiceInfo;
-import com.hortonworks.registries.common.SchemaRegistryVersion;
-import com.hortonworks.registries.common.catalog.CatalogResponse;
-import com.hortonworks.registries.common.util.ClassLoaderAwareInvocationHandler;
-import com.hortonworks.registries.schemaregistry.CompatibilityResult;
-import com.hortonworks.registries.schemaregistry.ConfigEntry;
-import com.hortonworks.registries.schemaregistry.SchemaBranch;
-import com.hortonworks.registries.schemaregistry.SchemaFieldQuery;
-import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
-import com.hortonworks.registries.schemaregistry.SchemaMetadata;
-import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
-import com.hortonworks.registries.schemaregistry.SchemaProviderInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersion;
-import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
-import com.hortonworks.registries.schemaregistry.SchemaVersionMergeResult;
-import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
-import com.hortonworks.registries.schemaregistry.SerDesInfo;
-import com.hortonworks.registries.schemaregistry.SerDesPair;
-import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache;
-import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException;
-import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaBranchDeletionException;
-import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException;
-import com.hortonworks.registries.schemaregistry.errors.SchemaBranchAlreadyExistsException;
-import com.hortonworks.registries.schemaregistry.errors.SchemaBranchNotFoundException;
-import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
-import com.hortonworks.registries.schemaregistry.exceptions.RegistryRetryableException;
-import com.hortonworks.registries.schemaregistry.serde.SerDesException;
-import com.hortonworks.registries.schemaregistry.serde.SnapshotDeserializer;
-import com.hortonworks.registries.schemaregistry.serde.SnapshotSerializer;
-import com.hortonworks.registries.schemaregistry.serde.pull.PullDeserializer;
-import com.hortonworks.registries.schemaregistry.serde.pull.PullSerializer;
-import com.hortonworks.registries.schemaregistry.serde.push.PushDeserializer;
-import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException;
-import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.util.security.MessageDigestUtils;
-import org.glassfish.jersey.SslConfigurator;
-import org.glassfish.jersey.client.ClientConfig;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.JerseyClientBuilder;
-import org.glassfish.jersey.media.multipart.BodyPart;
-import org.glassfish.jersey.media.multipart.FormDataMultiPart;
-import org.glassfish.jersey.media.multipart.MultiPart;
-import org.glassfish.jersey.media.multipart.MultiPartFeature;
-import org.glassfish.jersey.media.multipart.file.StreamDataBodyPart;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLContext;
-import javax.security.auth.login.LoginException;
-import javax.ws.rs.BadRequestException;
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Proxy;
-import java.net.URLEncoder;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_CONNECTION_TIMEOUT;
-import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_READ_TIMEOUT;
-import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL;
- * NOTE: This class is a copy of the SchemaRegistryClient from https://github.com/hortonworks/registry.
- * The changes to this file are the following:
- * - Making the 'private Login login' become protected for access by sub-classes
- * - Making the 'private Configuration configuration' become protected for access by sub-classes
- *
- * This is the default implementation of {@link ISchemaRegistryClient} which connects to the given {@code rootCatalogURL}.
- *
- * An instance of SchemaRegistryClient can be instantiated by passing configuration properties like below.
- *
- * SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
- *
- *
- * There are different options available as mentioned in {@link Configuration} like
- *
- * - {@link Configuration#SCHEMA_REGISTRY_URL}.
- * - {@link Configuration#SCHEMA_METADATA_CACHE_SIZE}.
- * - {@link Configuration#SCHEMA_VERSION_CACHE_SIZE}.
- * - {@link Configuration#SCHEMA_TEXT_CACHE_SIZE}.
- *
- * and many other properties like {@link ClientProperties}
- *
- *
- * This can be used to
- * - register schema metadata
- * - add new versions of a schema
- * - fetch different versions of schema
- * - fetch latest version of a schema
- * - check whether the given schema text is compatible with a latest version of the schema
- * - register serializer/deserializer for a schema
- * - fetch serializer/deserializer for a schema
- *
- */
-public class SchemaRegistryClient implements ISchemaRegistryClient {
- private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryClient.class);
- private static final String SCHEMA_REGISTRY_PATH = "/schemaregistry";
- private static final String SCHEMAS_PATH = SCHEMA_REGISTRY_PATH + "/schemas/";
- private static final String SCHEMA_PROVIDERS_PATH = SCHEMA_REGISTRY_PATH + "/schemaproviders/";
- private static final String SCHEMAS_BY_ID_PATH = SCHEMA_REGISTRY_PATH + "/schemasById/";
- private static final String SCHEMA_VERSIONS_PATH = SCHEMAS_PATH + "versions/";
- private static final String FILES_PATH = SCHEMA_REGISTRY_PATH + "/files/";
- private static final String SERIALIZERS_PATH = SCHEMA_REGISTRY_PATH + "/serdes/";
- private static final String REGISTY_CLIENT_JAAS_SECTION = "RegistryClient";
- private static final Set> DESERIALIZER_INTERFACE_CLASSES = new LinkedHashSet<>(Arrays.asList(SnapshotDeserializer.class, PullDeserializer.class, PushDeserializer.class));
- private static final Set> SERIALIZER_INTERFACE_CLASSES = new LinkedHashSet<>(Arrays.asList(SnapshotSerializer.class, PullSerializer.class));
- private static final String SEARCH_FIELDS = SCHEMA_REGISTRY_PATH + "/search/schemas/fields";
- private static final long KERBEROS_SYNCHRONIZATION_TIMEOUT_MS = 180000;
- private static final String SSL_KEY_PASSWORD = "keyPassword";
- private static final String SSL_KEY_STORE_PATH = "keyStorePath";
- private static final SchemaRegistryVersion CLIENT_VERSION = SchemaRegistryServiceInfo.get().version();
- protected Login login;
- private final Client client;
- private final UrlSelector urlSelector;
- private final Map urlWithTargets;
- protected final Configuration configuration;
- private final ClassLoaderCache classLoaderCache;
- private final SchemaVersionInfoCache schemaVersionInfoCache;
- private final SchemaMetadataCache schemaMetadataCache;
- private final Cache schemaTextCache;
- private static final String SSL_CONFIGURATION_KEY = "schema.registry.client.ssl";
- private static final String HOSTNAME_VERIFIER_CLASS_KEY = "hostnameVerifierClass";
- /**
- * Creates {@link SchemaRegistryClient} instance with the given yaml config.
- *
- * @param confFile config file which contains the configuration entries.
- *
- * @throws IOException when any IOException occurs while reading the given confFile
- */
- public SchemaRegistryClient(File confFile) throws IOException {
- this(buildConfFromFile(confFile));
- }
- private static Map buildConfFromFile(File confFile) throws IOException {
- try (FileInputStream fis = new FileInputStream(confFile)) {
- return (Map) new Yaml().load(IOUtils.toString(fis, "UTF-8"));
- }
- }
- public SchemaRegistryClient(Map conf) {
- configuration = new Configuration(conf);
- initializeSecurityContext();
- ClientConfig config = createClientConfig(conf);
- ClientBuilder clientBuilder = JerseyClientBuilder.newBuilder()
- .withConfig(config)
- .property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
- if (conf.containsKey(SSL_CONFIGURATION_KEY)) {
- Map sslConfigurations = (Map) conf.get(SSL_CONFIGURATION_KEY);
- clientBuilder.sslContext(createSSLContext(sslConfigurations));
- if (sslConfigurations.containsKey(HOSTNAME_VERIFIER_CLASS_KEY)) {
- HostnameVerifier hostNameVerifier = null;
- String hostNameVerifierClassName = sslConfigurations.get(HOSTNAME_VERIFIER_CLASS_KEY);
- try {
- hostNameVerifier = (HostnameVerifier) Class.forName(hostNameVerifierClassName).newInstance();
- } catch (Exception e) {
- throw new RuntimeException("Failed to instantiate hostNameVerifierClass : " + hostNameVerifierClassName, e);
- }
- clientBuilder.hostnameVerifier(hostNameVerifier);
- }
- }
- client = clientBuilder.build();
- client.register(MultiPartFeature.class);
- // get list of urls and create given or default UrlSelector.
- urlSelector = createUrlSelector();
- urlWithTargets = new ConcurrentHashMap<>();
- classLoaderCache = new ClassLoaderCache(this);
- schemaVersionInfoCache = new SchemaVersionInfoCache(
- new SchemaVersionRetriever() {
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
- return doGetSchemaVersionInfo(key);
- }
- @Override
- public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
- return doGetSchemaVersionInfo(key);
- }
- },
- ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(),
- ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L
- );
- SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = createSchemaMetadataFetcher();
- schemaMetadataCache = new SchemaMetadataCache(((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_SIZE
- .name())).longValue(),
- ((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS
- .name())).longValue(),
- schemaMetadataFetcher);
- schemaTextCache = Caffeine.newBuilder()
- .maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE
- .name())).longValue())
- .expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS
- .name())).longValue(),
- TimeUnit.SECONDS)
- .build();
- }
- protected void initializeSecurityContext() {
- String saslJaasConfig = configuration.getValue(Configuration.SASL_JAAS_CONFIG.name());
- if (saslJaasConfig != null) {
- KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
- try {
- kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION, new JaasConfiguration(REGISTY_CLIENT_JAAS_SECTION, saslJaasConfig));
- kerberosLogin.login();
- login = kerberosLogin;
- return;
- } catch (LoginException e) {
- LOG.error("Failed to initialize the dynamic JAAS config: " + saslJaasConfig + ". Attempting static JAAS config.");
- } catch (Exception e) {
- LOG.error("Failed to parse the dynamic JAAS config. Attempting static JAAS config.", e);
- }
- }
- String jaasConfigFile = System.getProperty("java.security.auth.login.config");
- if (jaasConfigFile != null && !jaasConfigFile.trim().isEmpty()) {
- KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
- kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION);
- try {
- kerberosLogin.login();
- login = kerberosLogin;
- } catch (LoginException e) {
- LOG.error("Could not login using jaas config section " + REGISTY_CLIENT_JAAS_SECTION);
- login = new NOOPLogin();
- }
- } else {
- LOG.warn("System property for jaas config file is not defined. Its okay if schema registry is not running in secured mode");
- login = new NOOPLogin();
- }
- }
- protected SSLContext createSSLContext(Map sslConfigurations) {
- SslConfigurator sslConfigurator = SslConfigurator.newInstance();
- if (sslConfigurations.containsKey(SSL_KEY_STORE_PATH)) {
- sslConfigurator.keyStoreType(sslConfigurations.get("keyStoreType"))
- .keyStoreFile(sslConfigurations.get(SSL_KEY_STORE_PATH))
- .keyStorePassword(sslConfigurations.get("keyStorePassword"))
- .keyStoreProvider(sslConfigurations.get("keyStoreProvider"))
- .keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm"))
- .keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider"));
- if (sslConfigurations.containsKey(SSL_KEY_PASSWORD)) {
- sslConfigurator.keyPassword(sslConfigurations.get(SSL_KEY_PASSWORD));
- }
- }
- sslConfigurator.trustStoreType(sslConfigurations.get("trustStoreType"))
- .trustStoreFile(sslConfigurations.get("trustStorePath"))
- .trustStorePassword(sslConfigurations.get("trustStorePassword"))
- .trustStoreProvider(sslConfigurations.get("trustStoreProvider"))
- .trustManagerFactoryAlgorithm(sslConfigurations.get("trustManagerFactoryAlgorithm"))
- .trustManagerFactoryProvider(sslConfigurations.get("trustManagerFactoryProvider"));
- sslConfigurator.securityProtocol(sslConfigurations.get("protocol"));
- return sslConfigurator.createSSLContext();
- }
- private SchemaRegistryTargets currentSchemaRegistryTargets() {
- String url = urlSelector.select();
- urlWithTargets.computeIfAbsent(url, s -> new SchemaRegistryTargets(client.target(s)));
- return urlWithTargets.get(url);
- }
- private static class SchemaRegistryTargets {
- private final WebTarget schemaProvidersTarget;
- private final WebTarget schemasTarget;
- private final WebTarget schemasByIdTarget;
- private final WebTarget rootTarget;
- private final WebTarget searchFieldsTarget;
- private final WebTarget serializersTarget;
- private final WebTarget filesTarget;
- private final WebTarget schemaVersionsTarget;
- private final WebTarget schemaVersionsByIdTarget;
- private final WebTarget schemaVersionsStatesMachineTarget;
- SchemaRegistryTargets(WebTarget rootTarget) {
- this.rootTarget = rootTarget;
- schemaProvidersTarget = rootTarget.path(SCHEMA_PROVIDERS_PATH);
- schemasTarget = rootTarget.path(SCHEMAS_PATH);
- schemasByIdTarget = rootTarget.path(SCHEMAS_BY_ID_PATH);
- schemaVersionsByIdTarget = schemasTarget.path("versionsById");
- schemaVersionsTarget = rootTarget.path(SCHEMA_VERSIONS_PATH);
- schemaVersionsStatesMachineTarget = schemaVersionsTarget.path("statemachine");
- searchFieldsTarget = rootTarget.path(SEARCH_FIELDS);
- serializersTarget = rootTarget.path(SERIALIZERS_PATH);
- filesTarget = rootTarget.path(FILES_PATH);
- }
- }
- private UrlSelector createUrlSelector() {
- UrlSelector urlSelector = null;
- String rootCatalogURL = configuration.getValue(SCHEMA_REGISTRY_URL.name());
- String urlSelectorClass = configuration.getValue(Configuration.URL_SELECTOR_CLASS.name());
- if (urlSelectorClass == null) {
- urlSelector = new LoadBalancedFailoverUrlSelector(rootCatalogURL);
- } else {
- try {
- urlSelector = (UrlSelector) Class.forName(urlSelectorClass)
- .getConstructor(String.class)
- .newInstance(rootCatalogURL);
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
- | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- urlSelector.init(configuration.getConfig());
- return urlSelector;
- }
- private SchemaMetadataCache.SchemaMetadataFetcher createSchemaMetadataFetcher() {
- return new SchemaMetadataCache.SchemaMetadataFetcher() {
- @Override
- public SchemaMetadataInfo fetch(String name) throws SchemaNotFoundException {
- try {
- return getEntity(currentSchemaRegistryTargets().schemasTarget.path(name), SchemaMetadataInfo.class);
- } catch (NotFoundException e) {
- throw new SchemaNotFoundException(e);
- }
- }
- @Override
- public SchemaMetadataInfo fetch(Long id) throws SchemaNotFoundException {
- try {
- return getEntity(currentSchemaRegistryTargets().schemasByIdTarget.path(id.toString()), SchemaMetadataInfo.class);
- } catch (NotFoundException e) {
- throw new SchemaNotFoundException(e);
- }
- }
- };
- }
- protected ClientConfig createClientConfig(Map conf) {
- ClientConfig config = new ClientConfig();
- config.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT);
- config.property(ClientProperties.READ_TIMEOUT, DEFAULT_READ_TIMEOUT);
- config.property(ClientProperties.FOLLOW_REDIRECTS, true);
- for (Map.Entry entry : conf.entrySet()) {
- config.property(entry.getKey(), entry.getValue());
- }
- return config;
- }
- public Configuration getConfiguration() {
- return configuration;
- }
- @Override
- public Collection getSupportedSchemaProviders() {
- return getEntities(currentSchemaRegistryTargets().schemaProvidersTarget, SchemaProviderInfo.class);
- }
- @Override
- public Long registerSchemaMetadata(SchemaMetadata schemaMetadata) {
- return addSchemaMetadata(schemaMetadata);
- }
- @Override
- public Long addSchemaMetadata(SchemaMetadata schemaMetadata) {
- SchemaMetadataInfo schemaMetadataInfo = schemaMetadataCache.getIfPresent(SchemaMetadataCache.Key.of(schemaMetadata
- .getName()));
- if (schemaMetadataInfo == null) {
- return doRegisterSchemaMetadata(schemaMetadata, currentSchemaRegistryTargets().schemasTarget);
- }
- return schemaMetadataInfo.getId();
- }
- @Override
- public SchemaMetadataInfo updateSchemaMetadata(String schemaName, SchemaMetadata schemaMetadata) {
- SchemaMetadataInfo schemaMetadataInfo = postEntity(currentSchemaRegistryTargets().schemasTarget.path(schemaName), schemaMetadata, SchemaMetadataInfo.class);
- if (schemaMetadataInfo != null) {
- schemaMetadataCache.put(SchemaMetadataCache.Key.of(schemaName), schemaMetadataInfo);
- }
- return schemaMetadataInfo;
- }
- private Long doRegisterSchemaMetadata(SchemaMetadata schemaMetadata, WebTarget schemasTarget) {
- try {
- return postEntity(schemasTarget, schemaMetadata, Long.class);
- } catch(BadRequestException ex) {
- Response response = ex.getResponse();
- CatalogResponse catalogResponse = SchemaRegistryClient.readCatalogResponse(response.readEntity(String.class));
- if(catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.ENTITY_CONFLICT.getCode()) {
- return getSchemaMetadataInfo(schemaMetadata.getName()).getId();
- } else {
- throw ex;
- }
- }
- }
- @Override
- public SchemaMetadataInfo getSchemaMetadataInfo(String schemaName) {
- return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaName));
- }
- @Override
- public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
- return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaMetadataId));
- }
- @Override
- public void deleteSchema(String schemaName) throws SchemaNotFoundException {
- Collection schemaVersionInfos = getAllVersions(schemaName);
- schemaMetadataCache.invalidateSchemaMetadata(SchemaMetadataCache.Key.of(schemaName));
- if (schemaVersionInfos != null) {
- for (SchemaVersionInfo schemaVersionInfo: schemaVersionInfos) {
- SchemaIdVersion schemaIdVersion = new SchemaIdVersion(schemaVersionInfo.getId());
- schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
- }
- }
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s", schemaName));
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- int status = response.getStatus();
- if (status == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaNotFoundException(response.readEntity(String.class));
- } else if (status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
- throw new RuntimeException(response.readEntity(String.class));
- }
- }
- @Override
- public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
- InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaMetadata, schemaVersion, disableCanonicalCheck);
- }
- @Override
- public SchemaIdVersion addSchemaVersion(String schemaBranchName, SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
- InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- // get it, if it exists in cache
- SchemaDigestEntry schemaDigestEntry = buildSchemaTextEntry(schemaVersion, schemaMetadata.getName());
- SchemaIdVersion schemaIdVersion = schemaTextCache.getIfPresent(schemaDigestEntry);
- if (schemaIdVersion == null) {
- //register schema metadata if it does not exist
- Long metadataId = registerSchemaMetadata(schemaMetadata);
- if (metadataId == null) {
- LOG.error("Schema Metadata [{}] is not registered successfully", schemaMetadata);
- throw new RuntimeException("Given SchemaMetadata could not be registered: " + schemaMetadata);
- }
- // add schemaIdVersion
- schemaIdVersion = addSchemaVersion(schemaBranchName, schemaMetadata.getName(), schemaVersion, disableCanonicalCheck);
- }
- return schemaIdVersion;
- }
- @Override
- public SchemaIdVersion uploadSchemaVersion(String schemaName, String description, InputStream schemaVersionTextFile) throws
- InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- return uploadSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, description, schemaVersionTextFile);
- }
- public SchemaIdVersion uploadSchemaVersion(final String schemaBranchName,
- final String schemaName,
- final String description,
- final InputStream schemaVersionInputStream)
- throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
- if (schemaMetadataInfo == null) {
- throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
- }
- StreamDataBodyPart streamDataBodyPart = new StreamDataBodyPart("file", schemaVersionInputStream);
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions/upload").queryParam("branch",schemaBranchName);
- MultiPart multipartEntity =
- new FormDataMultiPart()
- .field("description", description, MediaType.APPLICATION_JSON_TYPE)
- .bodyPart(streamDataBodyPart);
- Entity multiPartEntity = Entity.entity(multipartEntity, MediaType.MULTIPART_FORM_DATA);
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request().post(multiPartEntity, Response.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
- }
- private SchemaDigestEntry buildSchemaTextEntry(SchemaVersion schemaVersion, String name) {
- byte[] digest;
- try {
- digest = MessageDigestUtils.getDigest(schemaVersion.getSchemaText().getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- // storing schema text string is expensive, so storing digest in cache's key.
- return new SchemaDigestEntry(name, digest);
- }
- @Override
- public SchemaIdVersion addSchemaVersion(final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
- throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, schemaVersion, disableCanonicalCheck);
- }
- @Override
- public SchemaIdVersion addSchemaVersion(final String schemaBranchName, final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
- throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
- try {
- return schemaTextCache.get(buildSchemaTextEntry(schemaVersion, schemaName),
- key -> {
- try {
- return doAddSchemaVersion(schemaBranchName, schemaName, schemaVersion, disableCanonicalCheck);
- } catch (final Exception e) {
- LOG.error("Encountered error while adding new version [{}] of schema [{}] and error [{}]", schemaVersion, schemaName, e);
- throw new RuntimeException(e);
- }
- });
- } catch (final RuntimeException e) {
- final Throwable cause = e.getCause();
- if (cause instanceof InvalidSchemaException)
- throw (InvalidSchemaException) cause;
- else if (cause instanceof IncompatibleSchemaException) {
- throw (IncompatibleSchemaException) cause;
- } else if (cause instanceof SchemaNotFoundException) {
- throw (SchemaNotFoundException) cause;
- } else {
- throw e;
- }
- }
- }
- @Override
- public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException, SchemaLifecycleException {
- schemaVersionInfoCache.invalidateSchema(new SchemaVersionInfoCache.Key(schemaVersionKey));
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%s", schemaVersionKey
- .getSchemaName(), schemaVersionKey.getVersion()));
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- handleDeleteSchemaResponse(response);
- }
- private void handleDeleteSchemaResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
- String msg = response.readEntity(String.class);
- switch (Response.Status.fromStatusCode(response.getStatus())) {
- case NOT_FOUND:
- throw new SchemaNotFoundException(msg);
- throw new SchemaLifecycleException(msg);
- throw new RuntimeException(msg);
- }
- }
- private SchemaIdVersion doAddSchemaVersion(String schemaBranchName, String schemaName,
- SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws IncompatibleSchemaException, InvalidSchemaException, SchemaNotFoundException {
- SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
- if (schemaMetadataInfo == null) {
- throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
- }
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions").queryParam("branch", schemaBranchName)
- .queryParam("disableCanonicalCheck", disableCanonicalCheck);
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaVersion), Response.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
- }
- private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaMetadataInfo,
- Response response) throws IncompatibleSchemaException, InvalidSchemaException {
- int status = response.getStatus();
- String msg = response.readEntity(String.class);
- if (status == Response.Status.BAD_REQUEST.getStatusCode() || status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
- CatalogResponse catalogResponse = readCatalogResponse(msg);
- if (CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
- throw new IncompatibleSchemaException(catalogResponse.getResponseMessage());
- } else if (CatalogResponse.ResponseMessage.INVALID_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
- throw new InvalidSchemaException(catalogResponse.getResponseMessage());
- } else {
- throw new RuntimeException(catalogResponse.getResponseMessage());
- }
- }
- Integer version = readEntity(msg, Integer.class);
- SchemaVersionInfo schemaVersionInfo = doGetSchemaVersionInfo(new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
- .getName(), version));
- return new SchemaIdVersion(schemaMetadataInfo.getId(), version, schemaVersionInfo.getId());
- }
- public static CatalogResponse readCatalogResponse(String msg) {
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- JsonNode node = objectMapper.readTree(msg);
- return objectMapper.treeToValue(node, CatalogResponse.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- public SchemaVersionInfo getSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
- try {
- return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
- } catch (SchemaNotFoundException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- @Override
- public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaName) throws SchemaNotFoundException {
- return getLatestSchemaVersionInfo(SchemaBranch.MASTER_BRANCH, schemaName);
- }
- @Override
- public SchemaVersionInfo getSchemaVersionInfo(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException {
- try {
- return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey));
- } catch (SchemaNotFoundException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- private SchemaVersionInfo doGetSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
- if (schemaIdVersion.getSchemaVersionId() != null) {
- LOG.info("Getting schema version from target registry for [{}]", schemaIdVersion.getSchemaVersionId());
- return getEntity(currentSchemaRegistryTargets()
- .schemaVersionsByIdTarget
- .path(schemaIdVersion.getSchemaVersionId().toString()),
- SchemaVersionInfo.class);
- } else if (schemaIdVersion.getSchemaMetadataId() != null) {
- SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaIdVersion.getSchemaMetadataId());
- SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
- .getName(), schemaIdVersion.getVersion());
- LOG.info("Getting schema version from target registry for key [{}]", schemaVersionKey);
- return doGetSchemaVersionInfo(schemaVersionKey);
- }
- throw new IllegalArgumentException("Given argument not valid: " + schemaIdVersion);
- }
- private SchemaVersionInfo doGetSchemaVersionInfo(SchemaVersionKey schemaVersionKey) {
- LOG.info("Getting schema version from target registry for [{}]", schemaVersionKey);
- String schemaName = schemaVersionKey.getSchemaName();
- WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%d", schemaName, schemaVersionKey
- .getVersion()));
- return getEntity(webTarget, SchemaVersionInfo.class);
- }
- @Override
- public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
- WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions/latest").queryParam("branch", schemaBranchName);;
- return getEntity(webTarget, SchemaVersionInfo.class);
- }
- @Override
- public Collection getAllVersions(String schemaName) throws SchemaNotFoundException {
- return getAllVersions(SchemaBranch.MASTER_BRANCH, schemaName);
- }
- private static String encode(String schemaName) {
- try {
- return URLEncoder.encode(schemaName, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- @Override
- public void enableSchemaVersion(Long schemaVersionId)
- throws SchemaNotFoundException, SchemaLifecycleException, IncompatibleSchemaException {
- try {
- transitionSchemaVersionState(schemaVersionId, "enable", null);
- } catch (SchemaLifecycleException e) {
- Throwable cause = e.getCause();
- if (cause != null && cause instanceof IncompatibleSchemaException) {
- throw (IncompatibleSchemaException) cause;
- }
- throw e;
- }
- }
- @Override
- public void disableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
- transitionSchemaVersionState(schemaVersionId, "disable", null);
- }
- @Override
- public void deleteSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
- transitionSchemaVersionState(schemaVersionId, "delete", null);
- }
- @Override
- public void archiveSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
- transitionSchemaVersionState(schemaVersionId, "archive", null);
- }
- @Override
- public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
- transitionSchemaVersionState(schemaVersionId, "startReview", null);
- }
- @Override
- public SchemaVersionMergeResult mergeSchemaVersion(Long schemaVersionId, boolean disableCanonicalCheck) throws SchemaNotFoundException, IncompatibleSchemaException {
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaVersionId + "/merge").queryParam("disableCanonicalCheck", disableCanonicalCheck);
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request().post(null);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- int status = response.getStatus();
- if (status == Response.Status.OK.getStatusCode()) {
- String msg = response.readEntity(String.class);
- return readEntity(msg, SchemaVersionMergeResult.class);
- } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaNotFoundException(response.readEntity(String.class));
- } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new IncompatibleSchemaException(response.readEntity(String.class));
- } else {
- throw new RuntimeException(response.readEntity(String.class));
- }
- }
- @Override
- public void transitionState(Long schemaVersionId,
- Byte targetStateId,
- byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
- boolean result = transitionSchemaVersionState(schemaVersionId, targetStateId.toString(), transitionDetails);
- }
- @Override
- public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() {
- return getEntity(currentSchemaRegistryTargets().schemaVersionsStatesMachineTarget,
- SchemaVersionLifecycleStateMachineInfo.class);
- }
- @Override
- public SchemaBranch createSchemaBranch(Long schemaVersionId, SchemaBranch schemaBranch) throws SchemaBranchAlreadyExistsException, SchemaNotFoundException {
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("versionsById/" + schemaVersionId + "/branch");
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaBranch), Response.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- int status = response.getStatus();
- if (status == Response.Status.OK.getStatusCode()) {
- String msg = response.readEntity(String.class);
- SchemaBranch returnedSchemaBranch = readEntity(msg, SchemaBranch.class);
- return returnedSchemaBranch;
- } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new SchemaNotFoundException(response.readEntity(String.class));
- } else if (status == Response.Status.CONFLICT.getStatusCode()) {
- throw new SchemaBranchAlreadyExistsException(response.readEntity(String.class));
- } else {
- throw new RuntimeException(response.readEntity(String.class));
- }
- }
- @Override
- public Collection getSchemaBranches(String schemaName) throws SchemaNotFoundException {
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/branches");
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request().get();
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- int status = response.getStatus();
- if (status == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaNotFoundException(response.readEntity(String.class));
- } else if (status != Response.Status.OK.getStatusCode()) {
- throw new RuntimeException(response.readEntity(String.class));
- }
- return parseResponseAsEntities(response.readEntity(String.class), SchemaBranch.class);
- }
- @Override
- public void deleteSchemaBranch(Long schemaBranchId) throws SchemaBranchNotFoundException, InvalidSchemaBranchDeletionException {
- WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("branch/" + schemaBranchId);
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return target.request().delete();
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- int status = response.getStatus();
- if (status == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaBranchNotFoundException(response.readEntity(String.class));
- } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new InvalidSchemaBranchDeletionException(response.readEntity(String.class));
- } else if (status != Response.Status.OK.getStatusCode()) {
- throw new RuntimeException(response.readEntity(String.class));
- }
- }
- @Override
- public Collection getAllVersions(String schemaBranchName, String schemaName, List stateIds) throws SchemaNotFoundException, SchemaBranchNotFoundException {
- WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName).queryParam("states", stateIds.toArray());
- return getEntities(webTarget, SchemaVersionInfo.class);
- }
- private boolean transitionSchemaVersionState(Long schemaVersionId,
- String operationOrTargetState,
- byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
- WebTarget webTarget = currentSchemaRegistryTargets().schemaVersionsTarget.path(schemaVersionId + "/state/" + operationOrTargetState);
- Response response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public Response run() {
- return webTarget.request().post(Entity.text(transitionDetails));
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- boolean result = handleSchemaLifeCycleResponse(response);
- // invalidate this entry from cache.
- schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaIdVersion(schemaVersionId)));
- return result;
- }
- private boolean handleSchemaLifeCycleResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
- boolean result;
- int status = response.getStatus();
- if (status == Response.Status.OK.getStatusCode()) {
- result = response.readEntity(Boolean.class);
- } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaNotFoundException(response.readEntity(String.class));
- } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
- CatalogResponse catalogResponse = readCatalogResponse(response.readEntity(String.class));
- if (catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode()) {
- throw new SchemaLifecycleException(new IncompatibleSchemaException(catalogResponse.getResponseMessage()));
- }
- throw new SchemaLifecycleException(catalogResponse.getResponseMessage());
- } else {
- throw new RuntimeException(response.readEntity(String.class));
- }
- return result;
- }
- @Override
- public Collection getAllVersions(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
- WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName);
- return getEntities(webTarget, SchemaVersionInfo.class);
- }
- @Override
- public CompatibilityResult checkCompatibility(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
- return checkCompatibility(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
- }
- @Override
- public CompatibilityResult checkCompatibility(String schemaBranchName, String schemaName,
- String toSchemaText) throws SchemaNotFoundException {
- WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/compatibility").queryParam("branch", schemaBranchName);
- String response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public String run() {
- return webTarget.request().post(Entity.text(toSchemaText), String.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return readEntity(response, CompatibilityResult.class);
- }
- @Override
- public boolean isCompatibleWithAllVersions(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
- return isCompatibleWithAllVersions(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
- }
- @Override
- public boolean isCompatibleWithAllVersions(String schemaBranchName, String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
- return checkCompatibility(schemaBranchName, schemaName, toSchemaText).isCompatible();
- }
- @Override
- public Collection findSchemasByFields(SchemaFieldQuery schemaFieldQuery) {
- WebTarget target = currentSchemaRegistryTargets().searchFieldsTarget;
- for (Map.Entry entry : schemaFieldQuery.toQueryMap().entrySet()) {
- target = target.queryParam(entry.getKey(), entry.getValue());
- }
- return getEntities(target, SchemaVersionKey.class);
- }
- @Override
- public String uploadFile(InputStream inputStream) {
- MultiPart multiPart = new MultiPart();
- BodyPart filePart = new StreamDataBodyPart("file", inputStream, "file");
- multiPart.bodyPart(filePart);
- try {
- return login.doAction(new PrivilegedAction() {
- @Override
- public String run() {
- return currentSchemaRegistryTargets().filesTarget.request()
- .post(Entity.entity(multiPart, MediaType.MULTIPART_FORM_DATA), String.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- }
- @Override
- public InputStream downloadFile(String fileId) {
- try {
- return login.doAction(new PrivilegedAction() {
- @Override
- public InputStream run() {
- return currentSchemaRegistryTargets().filesTarget.path("download/" + encode(fileId))
- .request()
- .get(InputStream.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- }
- @Override
- public Long addSerDes(SerDesPair serDesPair) {
- return postEntity(currentSchemaRegistryTargets().serializersTarget, serDesPair, Long.class);
- }
- @Override
- public void mapSchemaWithSerDes(String schemaName, Long serDesId) {
- String path = String.format("%s/mapping/%s", encode(schemaName), serDesId.toString());
- Boolean success = postEntity(currentSchemaRegistryTargets().schemasTarget.path(path), null, Boolean.class);
- LOG.info("Received response while mapping schema [{}] with serialzer/deserializer [{}] : [{}]", schemaName, serDesId, success);
- }
- @Override
- public T getDefaultSerializer(String type) throws SerDesException {
- Collection supportedSchemaProviders = getSupportedSchemaProviders();
- for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
- if (schemaProvider.getType().equals(type)) {
- try {
- return (T) Class.forName(schemaProvider.getDefaultSerializerClassName()).newInstance();
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
- throw new SerDesException(e);
- }
- }
- }
- throw new IllegalArgumentException("No schema provider registered for the given type " + type);
- }
- @Override
- public T getDefaultDeserializer(String type) throws SerDesException {
- Collection supportedSchemaProviders = getSupportedSchemaProviders();
- for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
- if (schemaProvider.getType().equals(type)) {
- try {
- return (T) Class.forName(schemaProvider.getDefaultDeserializerClassName()).newInstance();
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
- throw new SerDesException(e);
- }
- }
- }
- throw new IllegalArgumentException("No schema provider registered for the given type " + type);
- }
- @Override
- public Collection getSerDes(String schemaName) {
- String path = encode(schemaName) + "/serdes/";
- return getEntities(currentSchemaRegistryTargets().schemasTarget.path(path), SerDesInfo.class);
- }
- public T createSerializerInstance(SerDesInfo serDesInfo) {
- return createInstance(serDesInfo, true);
- }
- @Override
- public T createDeserializerInstance(SerDesInfo serDesInfo) {
- return createInstance(serDesInfo, false);
- }
- @Override
- public void close() {
- client.close();
- }
- public SchemaRegistryVersion clientVersion() {
- }
- private T createInstance(SerDesInfo serDesInfo, boolean isSerializer) {
- if (interfaceClasses == null || interfaceClasses.isEmpty()) {
- throw new IllegalArgumentException("interfaceClasses array must be neither null nor empty.");
- }
- // loading serializer, create a class loader and and keep them in cache.
- final SerDesPair serDesPair = serDesInfo.getSerDesPair();
- String fileId = serDesPair.getFileId();
- // get class loader for this file ID
- ClassLoader classLoader = classLoaderCache.getClassLoader(fileId);
- T t;
- try {
- String className =
- isSerializer ? serDesPair.getSerializerClassName() : serDesPair.getDeserializerClassName();
- Class clazz = (Class) Class.forName(className, true, classLoader);
- t = clazz.newInstance();
- List> classes = new ArrayList<>();
- for (Class> interfaceClass : interfaceClasses) {
- if (interfaceClass.isAssignableFrom(clazz)) {
- classes.add(interfaceClass);
- }
- }
- if (classes.isEmpty()) {
- throw new RuntimeException("Given Serialize/Deserializer " + className + " class does not implement any " +
- "one of the registered interfaces: " + interfaceClasses);
- }
- Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
- classes.toArray(new Class[classes.size()]),
- new ClassLoaderAwareInvocationHandler(classLoader, t));
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new SerDesException(e);
- }
- return t;
- }
- private List getEntities(WebTarget target, Class clazz) {
- String response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public String run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return parseResponseAsEntities(response, clazz);
- }
- private List parseResponseAsEntities(String response, Class clazz) {
- List entities = new ArrayList<>();
- try {
- ObjectMapper mapper = new ObjectMapper();
- JsonNode node = mapper.readTree(response);
- Iterator it = node.get("entities").elements();
- while (it.hasNext()) {
- entities.add(mapper.treeToValue(it.next(), clazz));
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- return entities;
- }
- private T postEntity(WebTarget target, Object json, Class responseType) {
- String response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public String run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(json), String.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return readEntity(response, responseType);
- }
- private T readEntity(String response, Class clazz) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(response, clazz);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- private T getEntity(WebTarget target, Class clazz) {
- String response = null;
- try {
- response = login.doAction(new PrivilegedAction() {
- @Override
- public String run() {
- return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
- }
- });
- } catch (LoginException e) {
- throw new RegistryRetryableException(e);
- }
- return readEntity(response, clazz);
- }
- public static final class Configuration {
- // we may want to remove schema.registry prefix from configuration properties as these are all properties
- // given by client.
- /**
- * URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1
- */
- public static final ConfigEntry SCHEMA_REGISTRY_URL =
- ConfigEntry.mandatory("schema.registry.url",
- String.class,
- "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
- "http://localhost:9090/api/v1",
- ConfigEntry.StringConverter.get(),
- ConfigEntry.NonEmptyStringValidator.get());
- /**
- * Default path for downloaded jars to be stored.
- */
- public static final String DEFAULT_LOCAL_JARS_PATH = "/tmp/schema-registry/local-jars";
- /**
- * Local directory path to which downloaded jars should be copied to. For ex: /tmp/schema-registry/local-jars
- */
- public static final ConfigEntry LOCAL_JAR_PATH =
- ConfigEntry.optional("schema.registry.client.local.jars.path",
- String.class,
- "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
- ConfigEntry.StringConverter.get(),
- ConfigEntry.NonEmptyStringValidator.get());
- /**
- * Default value for classloader cache size.
- */
- public static final long DEFAULT_CLASSLOADER_CACHE_SIZE = 1024L;
- /**
- * Default value for cache expiry interval in seconds.
- */
- public static final long DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS = 60 * 60L;
- /**
- * Maximum size of classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_SIZE}
- * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
- */
- public static final ConfigEntry CLASSLOADER_CACHE_SIZE =
- ConfigEntry.optional("schema.registry.client.class.loader.cache.size",
- Integer.class,
- "Maximum size of classloader cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Expiry interval(in seconds) of an entry in classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS}
- * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
- */
- public static final ConfigEntry CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS =
- ConfigEntry.optional("schema.registry.client.class.loader.cache.expiry.interval.secs",
- Integer.class,
- "Expiry interval(in seconds) of an entry in classloader cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- public static final long DEFAULT_SCHEMA_CACHE_SIZE = 1024;
- public static final long DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS = 5 * 60L;
- /**
- * Maximum size of schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
- */
- public static final ConfigEntry SCHEMA_VERSION_CACHE_SIZE =
- ConfigEntry.optional("schema.registry.client.schema.version.cache.size",
- Integer.class,
- "Maximum size of schema version cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Expiry interval(in seconds) of an entry in schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
- */
- public static final ConfigEntry SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS =
- ConfigEntry.optional("schema.registry.client.schema.version.cache.expiry.interval.secs",
- Integer.class,
- "Expiry interval(in seconds) of an entry in schema version cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Maximum size of schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
- */
- public static final ConfigEntry SCHEMA_METADATA_CACHE_SIZE =
- ConfigEntry.optional("schema.registry.client.schema.metadata.cache.size",
- Integer.class,
- "Maximum size of schema metadata cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Expiry interval(in seconds) of an entry in schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
- */
- public static final ConfigEntry SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS =
- ConfigEntry.optional("schema.registry.client.schema.metadata.cache.expiry.interval.secs",
- Integer.class,
- "Expiry interval(in seconds) of an entry in schema metadata cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Maximum size of schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}.
- * This cache has ability to store/get entries with same schema name and schema text.
- */
- public static final ConfigEntry SCHEMA_TEXT_CACHE_SIZE =
- ConfigEntry.optional("schema.registry.client.schema.text.cache.size",
- Integer.class,
- "Maximum size of schema text cache",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- * Expiry interval(in seconds) of an entry in schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
- */
- public static final ConfigEntry SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS =
- ConfigEntry.optional("schema.registry.client.schema.text.cache.expiry.interval.secs",
- Integer.class,
- "Expiry interval(in seconds) of an entry in schema text cache.",
- ConfigEntry.IntegerConverter.get(),
- ConfigEntry.PositiveNumberValidator.get());
- /**
- *
- */
- public static final ConfigEntry URL_SELECTOR_CLASS =
- ConfigEntry.optional("schema.registry.client.url.selector",
- String.class,
- "Schema Registry URL selector class.",
- FailoverUrlSelector.class.getName(),
- ConfigEntry.StringConverter.get(),
- ConfigEntry.NonEmptyStringValidator.get());
- /**
- *
- */
- public static final ConfigEntry SASL_JAAS_CONFIG =
- ConfigEntry.optional( "sasl.jaas.config",
- String.class,
- "Schema Registry Dynamic JAAS config for SASL connection.",
- null,
- ConfigEntry.StringConverter.get(),
- ConfigEntry.NonEmptyStringValidator.get());
- // connection properties
- /**
- * Default connection timeout on connections created while connecting to schema registry.
- */
- public static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000;
- /**
- * Default read timeout on connections created while connecting to schema registry.
- */
- public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
- private final Map config;
- private final Map> options;
- public Configuration(Map config) {
- Field[] fields = this.getClass().getDeclaredFields();
- this.options = Collections.unmodifiableMap(buildOptions(fields));
- this.config = buildConfig(config);
- }
- private Map buildConfig(Map config) {
- Map result = new HashMap<>();
- for (Map.Entry entry : config.entrySet()) {
- String key = entry.getKey();
- Object value = entry.getValue();
- ConfigEntry configEntry = options.get(key);
- if (configEntry != null) {
- if (value != null) {
- configEntry.validator().validate((value));
- } else {
- value = configEntry.defaultValue();
- }
- }
- result.put(key, value);
- }
- return result;
- }
- private Map> buildOptions(Field[] fields) {
- Map> options = new HashMap<>();
- for (Field field : fields) {
- Class> type = field.getType();
- if (type.isAssignableFrom(ConfigEntry.class)) {
- field.setAccessible(true);
- try {
- ConfigEntry configEntry = (ConfigEntry) field.get(this);
- options.put(configEntry.name(), configEntry);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- }
- return options;
- }
- public T getValue(String propertyKey) {
- return (T) (config.containsKey(propertyKey) ? config.get(propertyKey)
- : options.get(propertyKey).defaultValue());
- }
- public Map getConfig() {
- return Collections.unmodifiableMap(config);
- }
- public Collection> getAvailableConfigEntries() {
- return options.values();
- }
- }
- private static class SchemaDigestEntry {
- private final String name;
- private final byte[] schemaDigest;
- SchemaDigestEntry(String name, byte[] schemaDigest) {
- Objects.requireNonNull(name, "name can not be null");
- Objects.requireNonNull(schemaDigest, "schema digest can not be null");
- this.name = name;
- this.schemaDigest = schemaDigest;
- }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- SchemaDigestEntry that = (SchemaDigestEntry) o;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- return Arrays.equals(schemaDigest, that.schemaDigest);
- }
- @Override
- public int hashCode() {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + Arrays.hashCode(schemaDigest);
- return result;
- }
- }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index bd3a1a084d..a6a03cdc61 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -76,55 +76,55 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
private final ConcurrentMap, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
- private final ConcurrentMap, Tuple> schemaVersionByNameCache = new ConcurrentHashMap<>();
+ private final ConcurrentMap, Tuple> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap> schemaVersionByKeyCache = new ConcurrentHashMap<>();
private volatile long versionInfoCacheNanos;
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
- .name("url")
- .displayName("Schema Registry URL")
- .description("URL of the schema registry that this Controller Service should connect to, including version. For example, http://localhost:9090/api/v1")
- .addValidator(StandardValidators.URL_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .required(true)
- .build();
+ .name("url")
+ .displayName("Schema Registry URL")
+ .description("URL of the schema registry that this Controller Service should connect to, including version. For example, http://localhost:9090/api/v1")
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
- .name("cache-size")
- .displayName("Cache Size")
- .description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .defaultValue("1000")
- .required(true)
- .build();
+ .name("cache-size")
+ .displayName("Cache Size")
+ .description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .required(true)
+ .build();
static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
- .name("cache-expiration")
- .displayName("Cache Expiration")
- .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
- + "cached version of a schema will no longer be used, and the service will have to communicate with the "
- + "Hortonworks Schema Registry again in order to obtain the schema.")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("1 hour")
- .required(true)
- .build();
+ .name("cache-expiration")
+ .displayName("Cache Expiration")
+ .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+ + "cached version of a schema will no longer be used, and the service will have to communicate with the "
+ + "Hortonworks Schema Registry again in order to obtain the schema.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("1 hour")
+ .required(true)
+ .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("ssl-context-service")
- .displayName("SSL Context Service")
- .description("Specifies the SSL Context Service to use for communicating with Schema Registry.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("Specifies the SSL Context Service to use for communicating with Schema Registry.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
- .name("kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
- .identifiesControllerService(KerberosCredentialsService.class)
- .required(false)
- .build();
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
@@ -145,6 +145,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
+ static final PropertyDescriptor BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
+ .name("basic-auth-username")
+ .displayName("Basic Authentication Username")
+ .description("The username to use for basic authentication when the Schema Registry is behind a proxy such as Apache Knox.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
+ .name("basic-auth-password")
+ .displayName("Basic Authentication Password")
+ .description("The password to use for basic authentication when the Schema Registry is behind a proxy such as Apache Knox.")
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
private volatile boolean usingKerberosWithPassword = false;
private volatile SchemaRegistryClient schemaRegistryClient;
private volatile boolean initialized;
@@ -157,6 +174,8 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
+ final String basicAuthUsername = validationContext.getProperty(BASIC_AUTH_USERNAME).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService kerberosCredentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)
@@ -184,6 +203,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
+ if ((validationContext.getProperty(BASIC_AUTH_USERNAME).isSet() || validationContext.getProperty(BASIC_AUTH_PASSWORD).isSet())
+ && !validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
+ results.add(new ValidationResult.Builder()
+ .subject(BASIC_AUTH_USERNAME.getDisplayName())
+ .valid(false)
+ .explanation("SSL Context Service must be set when using basic authentication")
+ .build());
+ }
+ if ((!StringUtils.isBlank(kerberosPrincipal) || kerberosCredentialsService != null ) && !StringUtils.isBlank(basicAuthUsername)) {
+ results.add(new ValidationResult.Builder()
+ .subject(BASIC_AUTH_USERNAME.getDisplayName())
+ .valid(false)
+ .explanation("kerberos- and basic authentication cannot be configured at the same time")
+ .build());
+ }
return results;
@@ -229,6 +265,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER, getLogger());
usingKerberosWithPassword = true;
+ if (context.getProperty(BASIC_AUTH_USERNAME).isSet()) {
+ schemaRegistryConfig.put(SchemaRegistryClient.Configuration.AUTH_USERNAME.name(), context.getProperty(BASIC_AUTH_USERNAME).getValue());
+ }
+ if (context.getProperty(BASIC_AUTH_PASSWORD).isSet()) {
+ schemaRegistryConfig.put(SchemaRegistryClient.Configuration.AUTH_PASSWORD.name(), context.getProperty(BASIC_AUTH_PASSWORD).getValue());
+ }
private String getKeytabJaasConfig(final String principal, final String keytab) {
@@ -259,7 +303,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
propertiesBuilder.put("trustStoreType", sslContextService.getTrustStoreType());
- return Collections.unmodifiableMap(propertiesBuilder);
+ return Collections.unmodifiableMap(propertiesBuilder);
@@ -283,6 +327,8 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
+ properties.add(BASIC_AUTH_USERNAME);
+ properties.add(BASIC_AUTH_PASSWORD);
return properties;
@@ -304,7 +350,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
// Try to fetch the SchemaVersionInfo from the cache.
- final Tuple nameAndBranch = new Tuple<>(schemaName, branchName);
+ final Tuple nameAndBranch = new Tuple<>(schemaName, branchName);
final Tuple timestampedVersionInfo = schemaVersionByNameCache.get(nameAndBranch);
// Determine if the timestampedVersionInfo is expired
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
index 18eb77c277..af644a24ff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
@@ -16,6 +16,7 @@
package org.apache.nifi.schemaregistry.hortonworks;
+import com.hortonworks.registries.auth.Login;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.security.krb.KerberosLoginException;
@@ -25,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
+import java.lang.reflect.Field;
import java.util.Map;
@@ -47,17 +49,17 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
protected void initializeSecurityContext() {
- final String principal = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
+ final String principal = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
if (principal == null) {
throw new IllegalArgumentException("Failed to login because principal is null");
- final String password = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
+ final String password = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
if (password == null) {
throw new IllegalArgumentException("Failed to login because password is null");
- final Object loggerObject = configuration.getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
+ final Object loggerObject = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
if (loggerObject == null) {
throw new IllegalArgumentException("Failed to login because component logger is required");
@@ -67,7 +69,8 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
kerberosUser = new KerberosPasswordUser(principal, password);
- login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
+ Login login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
+ setLogin(login);
try {
@@ -76,6 +79,16 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
+ public void setLogin(Login login) {
+ try {
+ Field loginField = SchemaRegistryClient.class.getDeclaredField("login");
+ loginField.setAccessible(true);
+ loginField.set(this, login);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
public void close() {
try {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistryTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistryTest.java
new file mode 100644
index 0000000000..8954b9e620
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistryTest.java
@@ -0,0 +1,107 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.hortonworks;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.mockito.Mockito.when;
+public class HortonworksSchemaRegistryTest {
+ private HortonworksSchemaRegistry testSubject;
+ private TestRunner runner;
+ @Mock
+ private Processor dummyProcessor;
+ @Mock
+ private SSLContextService mockSSLContextService;
+ @Mock
+ private KerberosCredentialsService mockKerberosCredentialsService;
+ @BeforeEach
+ void setUp() throws Exception {
+ MockitoAnnotations.openMocks(this);
+ testSubject = new HortonworksSchemaRegistry();
+ runner = TestRunners.newTestRunner(dummyProcessor);
+ runner.addControllerService("hortonworks-schema-registry", testSubject);
+ when(mockSSLContextService.getIdentifier()).thenReturn("ssl-controller-service-id");
+ when(mockKerberosCredentialsService.getIdentifier()).thenReturn("kerberos-credentials-service-id");
+ }
+ @Test
+ void invalidWhenBasicUsernameWithoutSSLContextIsSet() throws Exception {
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "username");
+ runner.assertNotValid(testSubject);
+ }
+ @Test
+ void validWhenBasicUsernameWithSSLContextIsSet() throws Exception {
+ addAndEnable(mockSSLContextService);
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
+ runner.assertValid(testSubject);
+ }
+ @Test
+ void invalidWhenBasicUsernameAndKerberosPrincipalBothSet() throws Exception {
+ addAndEnable(mockSSLContextService);
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_PRINCIPAL, "kerberos principal");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_PASSWORD, "kerberos password");
+ runner.assertNotValid(testSubject);
+ }
+ @Test
+ void invalidWhenBasicUsernameAndKerberosCredentialsServivceBothSet() throws Exception {
+ addAndEnable(mockSSLContextService);
+ addAndEnable(mockKerberosCredentialsService);
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_CREDENTIALS_SERVICE, mockKerberosCredentialsService.getIdentifier());
+ runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
+ runner.assertNotValid(testSubject);
+ }
+ private void addAndEnable(ControllerService service) throws InitializationException {
+ runner.addControllerService(service.getIdentifier(), service);
+ runner.enableControllerService(service);
+ }