mirror of https://github.com/apache/nifi.git
NIFI-11202 Removed deprecated Kerberos properties from Kafka 2_6
This closes #6978 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
dbc627e0b0
commit
ed1cee4520
|
@ -313,10 +313,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
|
|||
descriptors.add(HONOR_TRANSACTIONS);
|
||||
descriptors.add(SECURITY_PROTOCOL);
|
||||
descriptors.add(SASL_MECHANISM);
|
||||
descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
descriptors.add(KERBEROS_SERVICE_NAME);
|
||||
descriptors.add(KERBEROS_PRINCIPAL);
|
||||
descriptors.add(KERBEROS_KEYTAB);
|
||||
descriptors.add(SASL_USERNAME);
|
||||
descriptors.add(SASL_PASSWORD);
|
||||
descriptors.add(TOKEN_AUTHENTICATION);
|
||||
|
|
|
@ -266,11 +266,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
|
|||
descriptors.add(SEPARATE_BY_KEY);
|
||||
descriptors.add(SECURITY_PROTOCOL);
|
||||
descriptors.add(SASL_MECHANISM);
|
||||
descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
descriptors.add(KERBEROS_SERVICE_NAME);
|
||||
descriptors.add(KERBEROS_PRINCIPAL);
|
||||
descriptors.add(KERBEROS_KEYTAB);
|
||||
descriptors.add(SASL_USERNAME);
|
||||
descriptors.add(SASL_PASSWORD);
|
||||
descriptors.add(TOKEN_AUTHENTICATION);
|
||||
|
|
|
@ -342,11 +342,8 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
|
|||
properties.add(MESSAGE_HEADER_ENCODING);
|
||||
properties.add(SECURITY_PROTOCOL);
|
||||
properties.add(SASL_MECHANISM);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
properties.add(KERBEROS_SERVICE_NAME);
|
||||
properties.add(KERBEROS_PRINCIPAL);
|
||||
properties.add(KERBEROS_KEYTAB);
|
||||
properties.add(SASL_USERNAME);
|
||||
properties.add(SASL_PASSWORD);
|
||||
properties.add(TOKEN_AUTHENTICATION);
|
||||
|
|
|
@ -299,11 +299,8 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
|
|||
properties.add(MESSAGE_HEADER_ENCODING);
|
||||
properties.add(SECURITY_PROTOCOL);
|
||||
properties.add(SASL_MECHANISM);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
properties.add(KERBEROS_SERVICE_NAME);
|
||||
properties.add(KERBEROS_PRINCIPAL);
|
||||
properties.add(KERBEROS_KEYTAB);
|
||||
properties.add(SASL_USERNAME);
|
||||
properties.add(SASL_PASSWORD);
|
||||
properties.add(AWS_PROFILE_NAME);
|
||||
|
|
|
@ -170,7 +170,7 @@ public class KafkaRecordSink_2_6 extends AbstractControllerService implements Ka
|
|||
properties.add(DELIVERY_GUARANTEE);
|
||||
properties.add(MESSAGE_HEADER_ENCODING);
|
||||
properties.add(SECURITY_PROTOCOL);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
properties.add(KERBEROS_SERVICE_NAME);
|
||||
properties.add(SSL_CONTEXT_SERVICE);
|
||||
properties.add(MAX_REQUEST_SIZE);
|
||||
|
|
|
@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.nifi.kafka.shared.property.SaslMechanism;
|
||||
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
|
||||
|
@ -159,7 +161,7 @@ public class TestConsumeKafkaRecord_2_6 {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void validateGetErrorMessages() throws Exception {
|
||||
public void validateGetErrorMessages() {
|
||||
String groupName = "validateGetErrorMessages";
|
||||
|
||||
when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
|
||||
|
@ -181,7 +183,7 @@ public class TestConsumeKafkaRecord_2_6 {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testJaasConfigurationWithDefaultMechanism() {
|
||||
public void testJaasConfigurationWithDefaultMechanism() throws InitializationException {
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
|
||||
|
@ -192,13 +194,8 @@ public class TestConsumeKafkaRecord_2_6 {
|
|||
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_SERVICE_NAME, "kafka");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB, "not.A.File");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB, "src/test/resources/server.properties");
|
||||
final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
|
||||
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
|
@ -278,4 +275,12 @@ public class TestConsumeKafkaRecord_2_6 {
|
|||
runner.assertValid();
|
||||
}
|
||||
|
||||
private SelfContainedKerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
|
||||
final SelfContainedKerberosUserService kerberosUserService = mock(SelfContainedKerberosUserService.class);
|
||||
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
|
||||
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
|
||||
runner.enableControllerService(kerberosUserService);
|
||||
return kerberosUserService;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.nifi.kafka.shared.property.SaslMechanism;
|
||||
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
|
@ -100,36 +99,12 @@ public class TestConsumeKafka_2_6 {
|
|||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "kafka");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "not.A.File");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "src/test/resources/server.properties");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setVariable("keytab", "src/test/resources/server.properties");
|
||||
runner.setVariable("principal", "nifi@APACHE.COM");
|
||||
runner.setVariable("service", "kafka");
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL, "${principal}");
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "${keytab}");
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "${service}");
|
||||
runner.assertValid();
|
||||
|
||||
final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
|
||||
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.removeProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL);
|
||||
runner.removeProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB);
|
||||
runner.assertValid();
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = enabledKerberosCredentialsService(runner);
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_CREDENTIALS_SERVICE, kerberosCredentialsService.getIdentifier());
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.removeProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
runner.setVariable("service", "kafka");
|
||||
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "${service}");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
|
@ -141,15 +116,4 @@ public class TestConsumeKafka_2_6 {
|
|||
return kerberosUserService;
|
||||
}
|
||||
|
||||
private KerberosCredentialsService enabledKerberosCredentialsService(final TestRunner runner) throws InitializationException {
|
||||
final KerberosCredentialsService credentialsService = mock(KerberosCredentialsService.class);
|
||||
when(credentialsService.getIdentifier()).thenReturn("credsService1");
|
||||
when(credentialsService.getPrincipal()).thenReturn("principal1");
|
||||
when(credentialsService.getKeytab()).thenReturn("keytab1");
|
||||
|
||||
runner.addControllerService(credentialsService.getIdentifier(), credentialsService);
|
||||
runner.enableControllerService(credentialsService);
|
||||
return credentialsService;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,9 +27,7 @@ import org.apache.nifi.components.PropertyValue;
|
|||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
|
||||
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
|
@ -163,8 +161,6 @@ public class TestKafkaRecordSink_2_6 {
|
|||
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
|
||||
when(context.getProperty(KafkaRecordSink_2_6.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
|
||||
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
|
||||
when(context.getProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
|
||||
when(pValue.asControllerService(KerberosCredentialsService.class)).thenReturn(null);
|
||||
|
||||
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
|
||||
task.initialize(initContext);
|
||||
|
|
|
@ -17,12 +17,9 @@
|
|||
package org.apache.nifi.kafka.shared.component;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kafka.shared.property.SaslMechanism;
|
||||
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
@ -137,32 +134,6 @@ public interface KafkaClientComponent {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("sasl.kerberos.principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("Principal used for authentication with Kerberos")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
|
||||
.name("sasl.kerberos.keytab")
|
||||
.displayName("Kerberos Keytab")
|
||||
.description("Keytab credentials used for authentication with Kerberos")
|
||||
.required(false)
|
||||
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Service supporting generalized credentials authentication with Kerberos")
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-user-service")
|
||||
.displayName("Kerberos User Service")
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.kafka.shared.login;
|
||||
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
|
||||
/**
|
||||
* Kerberos Login Module implementation of configuration provider
|
||||
*/
|
||||
public class KerberosCredentialsLoginConfigProvider implements LoginConfigProvider {
|
||||
private static final String MODULE_CLASS_NAME = "com.sun.security.auth.module.Krb5LoginModule";
|
||||
|
||||
private static final String FORMAT = "%s required renewTicket=true useKeyTab=true serviceName=\"%s\" principal=\"%s\" keyTab=\"%s\";";
|
||||
|
||||
/**
|
||||
* Get JAAS configuration using configured Kerberos credentials
|
||||
*
|
||||
* @param context Property Context
|
||||
* @return JAAS configuration with Kerberos Login Module
|
||||
*/
|
||||
@Override
|
||||
public String getConfiguration(final PropertyContext context) {
|
||||
final String principal;
|
||||
final String keyTab;
|
||||
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
if (credentialsService == null) {
|
||||
principal = context.getProperty(KafkaClientComponent.KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
keyTab = context.getProperty(KafkaClientComponent.KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
|
||||
} else {
|
||||
principal = credentialsService.getPrincipal();
|
||||
keyTab = credentialsService.getKeytab();
|
||||
}
|
||||
|
||||
final String serviceName = context.getProperty(KafkaClientComponent.KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
|
||||
return String.format(FORMAT, MODULE_CLASS_NAME, serviceName, principal, keyTab);
|
||||
}
|
||||
}
|
|
@ -16,16 +16,12 @@
|
|||
*/
|
||||
package org.apache.nifi.kafka.shared.login;
|
||||
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
|
||||
|
||||
/**
|
||||
* Kerberos Delegating Login Module implementation of configuration provider
|
||||
*/
|
||||
public class KerberosDelegatingLoginConfigProvider implements LoginConfigProvider {
|
||||
private static final LoginConfigProvider CREDENTIALS_PROVIDER = new KerberosCredentialsLoginConfigProvider();
|
||||
|
||||
private static final LoginConfigProvider USER_SERVICE_PROVIDER = new KerberosUserServiceLoginConfigProvider();
|
||||
|
||||
/**
|
||||
|
@ -36,15 +32,6 @@ public class KerberosDelegatingLoginConfigProvider implements LoginConfigProvide
|
|||
*/
|
||||
@Override
|
||||
public String getConfiguration(final PropertyContext context) {
|
||||
final PropertyValue userServiceProperty = context.getProperty(KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
|
||||
final String configuration;
|
||||
if (userServiceProperty.isSet()) {
|
||||
configuration = USER_SERVICE_PROVIDER.getConfiguration(context);
|
||||
} else {
|
||||
configuration = CREDENTIALS_PROVIDER.getConfiguration(context);
|
||||
}
|
||||
|
||||
return configuration;
|
||||
return USER_SERVICE_PROVIDER.getConfiguration(context);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
|
|||
import org.apache.nifi.kafka.shared.property.SaslMechanism;
|
||||
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
|
||||
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -34,9 +33,6 @@ import java.util.List;
|
|||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE;
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_KEYTAB;
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_PRINCIPAL;
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
|
||||
import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_PASSWORD;
|
||||
|
@ -51,8 +47,6 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC
|
|||
|
||||
static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
|
||||
|
||||
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
|
||||
|
||||
private static final String JNDI_LOGIN_MODULE_CLASS = "JndiLoginModule";
|
||||
|
||||
private static final String JND_LOGIN_MODULE_EXPLANATION = "The JndiLoginModule is not allowed in the JAAS configuration";
|
||||
|
@ -72,7 +66,6 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC
|
|||
public Collection<ValidationResult> apply(final ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = new ArrayList<>();
|
||||
validateLoginModule(validationContext, results);
|
||||
validateKerberosServices(validationContext, results);
|
||||
validateKerberosCredentials(validationContext, results);
|
||||
validateUsernamePassword(validationContext, results);
|
||||
validateAwsMskIamMechanism(validationContext, results);
|
||||
|
@ -100,67 +93,6 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC
|
|||
}
|
||||
}
|
||||
|
||||
private void validateKerberosServices(final ValidationContext validationContext, final Collection<ValidationResult> results) {
|
||||
final PropertyValue userServiceProperty = validationContext.getProperty(SELF_CONTAINED_KERBEROS_USER_SERVICE);
|
||||
final PropertyValue credentialsServiceProperty = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE);
|
||||
final String principal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String keyTab = validationContext.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
|
||||
|
||||
if (userServiceProperty.isSet()) {
|
||||
if (credentialsServiceProperty.isSet()) {
|
||||
final String explanation = String.format("Cannot configure both [%s] and [%s]",
|
||||
SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName(),
|
||||
KERBEROS_CREDENTIALS_SERVICE.getDisplayName()
|
||||
);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
}
|
||||
|
||||
if (isNotEmpty(principal) || isNotEmpty(keyTab)) {
|
||||
final String explanation = String.format("Cannot configure [%s] with [%s] or [%s]",
|
||||
SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName(),
|
||||
KERBEROS_PRINCIPAL.getDisplayName(),
|
||||
KERBEROS_KEYTAB.getDisplayName()
|
||||
);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
}
|
||||
} else if (credentialsServiceProperty.isSet()) {
|
||||
if (isNotEmpty(principal) || isNotEmpty(keyTab)) {
|
||||
final String explanation = String.format("Cannot configure [%s] with [%s] or [%s]",
|
||||
KERBEROS_CREDENTIALS_SERVICE.getDisplayName(),
|
||||
KERBEROS_PRINCIPAL.getDisplayName(),
|
||||
KERBEROS_KEYTAB.getDisplayName()
|
||||
);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
final String allowExplicitKeytab = System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
if (Boolean.FALSE.toString().equalsIgnoreCase(allowExplicitKeytab) && (isNotEmpty(principal) || isNotEmpty(keyTab))) {
|
||||
final String explanation = String.format("Environment Variable [%s] disables configuring [%s] and [%s] properties",
|
||||
ALLOW_EXPLICIT_KEYTAB,
|
||||
KERBEROS_PRINCIPAL.getDisplayName(),
|
||||
KERBEROS_KEYTAB.getDisplayName()
|
||||
);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
private void validateKerberosCredentials(final ValidationContext validationContext, final Collection<ValidationResult> results) {
|
||||
final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();
|
||||
final String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
|
||||
|
@ -176,29 +108,10 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC
|
|||
.build());
|
||||
}
|
||||
|
||||
final String principal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String keyTab = validationContext.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
|
||||
final String systemLoginConfig = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
|
||||
|
||||
if (isEmpty(principal) && isNotEmpty(keyTab)) {
|
||||
final String explanation = String.format("[%s] required when configuring [%s]", KERBEROS_KEYTAB.getDisplayName(), KERBEROS_PRINCIPAL.getDisplayName());
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
} else if (isNotEmpty(principal) && isEmpty(keyTab)) {
|
||||
final String explanation = String.format("[%s] required when configuring [%s]", KERBEROS_PRINCIPAL.getDisplayName(), KERBEROS_KEYTAB.getDisplayName());
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_KEYTAB.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
}
|
||||
|
||||
final KerberosUserService userService = validationContext.getProperty(SELF_CONTAINED_KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
if (userService == null && credentialsService == null && isEmpty(principal) && isEmpty(keyTab) && isEmpty(systemLoginConfig)) {
|
||||
if (userService == null && isEmpty(systemLoginConfig)) {
|
||||
final String explanation = String.format("Kerberos Credentials not found in component properties or System Property [%s]", JAVA_SECURITY_AUTH_LOGIN_CONFIG);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(SASL_MECHANISM.getDisplayName())
|
||||
|
|
|
@ -98,39 +98,6 @@ class KafkaClientCustomValidationFunctionTest {
|
|||
assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testApplyUserServiceWithCredentialsServiceInvalid() {
|
||||
runner.setProperty(KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE, KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE.getName());
|
||||
runner.setProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE, KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getName());
|
||||
|
||||
final ValidationContext validationContext = getValidationContext();
|
||||
final Collection<ValidationResult> results = validationFunction.apply(validationContext);
|
||||
|
||||
assertPropertyValidationResultFound(results, KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getDisplayName());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testApplyCredentialsServiceWithPrincipalInvalid() {
|
||||
runner.setProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE, KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getName());
|
||||
runner.setProperty(KafkaClientComponent.KERBEROS_PRINCIPAL, KafkaClientComponent.KERBEROS_PRINCIPAL.getName());
|
||||
|
||||
final ValidationContext validationContext = getValidationContext();
|
||||
final Collection<ValidationResult> results = validationFunction.apply(validationContext);
|
||||
|
||||
assertPropertyValidationResultFound(results, KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getDisplayName());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testApplyPrincipalKeyTabValid() {
|
||||
runner.setProperty(KafkaClientComponent.KERBEROS_PRINCIPAL, KafkaClientComponent.KERBEROS_PRINCIPAL.getName());
|
||||
runner.setProperty(KafkaClientComponent.KERBEROS_KEYTAB, KafkaClientComponent.KERBEROS_KEYTAB.getName());
|
||||
|
||||
final ValidationContext validationContext = getValidationContext();
|
||||
final Collection<ValidationResult> results = validationFunction.apply(validationContext);
|
||||
|
||||
assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testApplyPlainUsernameWithoutPasswordInvalid() {
|
||||
runner.setProperty(KafkaClientComponent.SASL_USERNAME, KafkaClientComponent.SASL_USERNAME.getName());
|
||||
|
|
Loading…
Reference in New Issue