NIFI-10025 Removed in-project copy of com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient, relying on original instead. Added basic auth support for HortonworksSchemaRegistry.

This closes #6055.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2022-05-18 20:32:59 +02:00 committed by Peter Turcsanyi
parent 1442dcef23
commit b6a32a9c5d
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
4 changed files with 208 additions and 1552 deletions

View File

@ -76,55 +76,55 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<String,String>, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<String, String>, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
private volatile long versionInfoCacheNanos;
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
.name("url")
.displayName("Schema Registry URL")
.description("URL of the schema registry that this Controller Service should connect to, including version. For example, http://localhost:9090/api/v1")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
.name("url")
.displayName("Schema Registry URL")
.description("URL of the schema registry that this Controller Service should connect to, including version. For example, http://localhost:9090/api/v1")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
.name("cache-expiration")
.displayName("Cache Expiration")
.description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+ "cached version of a schema will no longer be used, and the service will have to communicate with the "
+ "Hortonworks Schema Registry again in order to obtain the schema.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 hour")
.required(true)
.build();
.name("cache-expiration")
.displayName("Cache Expiration")
.description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+ "cached version of a schema will no longer be used, and the service will have to communicate with the "
+ "Hortonworks Schema Registry again in order to obtain the schema.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 hour")
.required(true)
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Schema Registry.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Schema Registry.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
@ -145,6 +145,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
.name("basic-auth-username")
.displayName("Basic Authentication Username")
.description("The username to use for basic authentication when the Schema Registry is behind a proxy such as Apache Knox.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
static final PropertyDescriptor BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
.name("basic-auth-password")
.displayName("Basic Authentication Password")
.description("The password to use for basic authentication when the Schema Registry is behind a proxy such as Apache Knox.")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
private volatile boolean usingKerberosWithPassword = false;
private volatile SchemaRegistryClient schemaRegistryClient;
private volatile boolean initialized;
@ -157,6 +174,8 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
final String basicAuthUsername = validationContext.getProperty(BASIC_AUTH_USERNAME).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService kerberosCredentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)
.asControllerService(KerberosCredentialsService.class);
@ -184,6 +203,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.build());
}
if ((validationContext.getProperty(BASIC_AUTH_USERNAME).isSet() || validationContext.getProperty(BASIC_AUTH_PASSWORD).isSet())
&& !validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder()
.subject(BASIC_AUTH_USERNAME.getDisplayName())
.valid(false)
.explanation("SSL Context Service must be set when using basic authentication")
.build());
}
if ((!StringUtils.isBlank(kerberosPrincipal) || kerberosCredentialsService != null ) && !StringUtils.isBlank(basicAuthUsername)) {
results.add(new ValidationResult.Builder()
.subject(BASIC_AUTH_USERNAME.getDisplayName())
.valid(false)
.explanation("kerberos- and basic authentication cannot be configured at the same time")
.build());
}
return results;
}
@ -229,6 +265,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER, getLogger());
usingKerberosWithPassword = true;
}
if (context.getProperty(BASIC_AUTH_USERNAME).isSet()) {
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.AUTH_USERNAME.name(), context.getProperty(BASIC_AUTH_USERNAME).getValue());
}
if (context.getProperty(BASIC_AUTH_PASSWORD).isSet()) {
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.AUTH_PASSWORD.name(), context.getProperty(BASIC_AUTH_PASSWORD).getValue());
}
}
private String getKeytabJaasConfig(final String principal, final String keytab) {
@ -259,7 +303,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
propertiesBuilder.put("trustStoreType", sslContextService.getTrustStoreType());
}
}
return Collections.unmodifiableMap(propertiesBuilder);
return Collections.unmodifiableMap(propertiesBuilder);
}
@OnDisabled
@ -283,6 +327,8 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_PASSWORD);
properties.add(BASIC_AUTH_USERNAME);
properties.add(BASIC_AUTH_PASSWORD);
return properties;
}
@ -304,7 +350,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
// Try to fetch the SchemaVersionInfo from the cache.
final Tuple<String,String> nameAndBranch = new Tuple<>(schemaName, branchName);
final Tuple<String, String> nameAndBranch = new Tuple<>(schemaName, branchName);
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(nameAndBranch);
// Determine if the timestampedVersionInfo is expired

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.schemaregistry.hortonworks;
import com.hortonworks.registries.auth.Login;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.security.krb.KerberosLoginException;
@ -25,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
import java.lang.reflect.Field;
import java.util.Map;
/**
@ -47,17 +49,17 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
@Override
protected void initializeSecurityContext() {
final String principal = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
final String principal = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
if (principal == null) {
throw new IllegalArgumentException("Failed to login because principal is null");
}
final String password = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
final String password = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
if (password == null) {
throw new IllegalArgumentException("Failed to login because password is null");
}
final Object loggerObject = configuration.getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
final Object loggerObject = getConfiguration().getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
if (loggerObject == null) {
throw new IllegalArgumentException("Failed to login because component logger is required");
}
@ -67,7 +69,8 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
}
kerberosUser = new KerberosPasswordUser(principal, password);
login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
Login login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
setLogin(login);
try {
login.login();
@ -76,6 +79,16 @@ public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClie
}
}
public void setLogin(Login login) {
try {
Field loginField = SchemaRegistryClient.class.getDeclaredField("login");
loginField.setAccessible(true);
loginField.set(this, login);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schemaregistry.hortonworks;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.mockito.Mockito.when;
public class HortonworksSchemaRegistryTest {
private HortonworksSchemaRegistry testSubject;
private TestRunner runner;
@Mock
private Processor dummyProcessor;
@Mock
private SSLContextService mockSSLContextService;
@Mock
private KerberosCredentialsService mockKerberosCredentialsService;
@BeforeEach
void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
testSubject = new HortonworksSchemaRegistry();
runner = TestRunners.newTestRunner(dummyProcessor);
runner.addControllerService("hortonworks-schema-registry", testSubject);
when(mockSSLContextService.getIdentifier()).thenReturn("ssl-controller-service-id");
when(mockKerberosCredentialsService.getIdentifier()).thenReturn("kerberos-credentials-service-id");
}
@Test
void invalidWhenBasicUsernameWithoutSSLContextIsSet() throws Exception {
runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "username");
runner.assertNotValid(testSubject);
}
@Test
void validWhenBasicUsernameWithSSLContextIsSet() throws Exception {
addAndEnable(mockSSLContextService);
runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
runner.assertValid(testSubject);
}
@Test
void invalidWhenBasicUsernameAndKerberosPrincipalBothSet() throws Exception {
addAndEnable(mockSSLContextService);
runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_PRINCIPAL, "kerberos principal");
runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_PASSWORD, "kerberos password");
runner.assertNotValid(testSubject);
}
@Test
void invalidWhenBasicUsernameAndKerberosCredentialsServivceBothSet() throws Exception {
addAndEnable(mockSSLContextService);
addAndEnable(mockKerberosCredentialsService);
runner.setProperty(testSubject, HortonworksSchemaRegistry.URL, "http://unimportant");
runner.setProperty(testSubject, HortonworksSchemaRegistry.SSL_CONTEXT_SERVICE, mockSSLContextService.getIdentifier());
runner.setProperty(testSubject, HortonworksSchemaRegistry.KERBEROS_CREDENTIALS_SERVICE, mockKerberosCredentialsService.getIdentifier());
runner.setProperty(testSubject, HortonworksSchemaRegistry.BASIC_AUTH_USERNAME, "basic username");
runner.assertNotValid(testSubject);
}
private void addAndEnable(ControllerService service) throws InitializationException {
runner.addControllerService(service.getIdentifier(), service);
runner.enableControllerService(service);
}
}