NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components

This closes #6743
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2022-12-01 08:15:20 -06:00 committed by Paul Grey
parent 570fc7f1fa
commit c3b0e1a790
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
6 changed files with 171 additions and 8 deletions

View File

@ -47,7 +47,7 @@ public class DelegatingLoginConfigProvider implements LoginConfigProvider {
@Override
public String getConfiguration(final PropertyContext context) {
final String saslMechanismProperty = context.getProperty(KafkaClientComponent.SASL_MECHANISM).getValue();
final SaslMechanism saslMechanism = SaslMechanism.valueOf(saslMechanismProperty);
final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(saslMechanismProperty);
final LoginConfigProvider loginConfigProvider = PROVIDERS.getOrDefault(saslMechanism, SCRAM_PROVIDER);
return loginConfigProvider.getConfiguration(context);
}

View File

@ -18,6 +18,9 @@ package org.apache.nifi.kafka.shared.property;
import org.apache.nifi.components.DescribedValue;
import java.util.Arrays;
import java.util.Optional;
/**
* Enumeration of supported Kafka SASL Mechanisms
*/
@ -42,6 +45,13 @@ public enum SaslMechanism implements DescribedValue {
this.description = description;
}
public static SaslMechanism getSaslMechanism(final String value) {
final Optional<SaslMechanism> foundSaslMechanism = Arrays.stream(SaslMechanism.values())
.filter(saslMechanism -> saslMechanism.getValue().equals(value))
.findFirst();
return foundSaslMechanism.orElseThrow(() -> new IllegalArgumentException(String.format("SaslMechanism value [%s] not found", value)));
}
@Override
public String getValue() {
return value;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -58,9 +59,12 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
final Set<String> propertyNames = new LinkedHashSet<>();
for (final String propertyClassName : PROPERTY_CLASSES) {
final Class<?> propertyClass = getClass(propertyClassName);
final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
propertyNames.addAll(classPropertyNames);
final Optional<Class<?>> propertyClassFound = findClass(propertyClassName);
if (propertyClassFound.isPresent()) {
final Class<?> propertyClass = propertyClassFound.get();
final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
propertyNames.addAll(classPropertyNames);
}
}
return propertyNames;
@ -93,11 +97,11 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
}
}
private static Class<?> getClass(final String className) {
private static Optional<Class<?>> findClass(final String className) {
try {
return Class.forName(className);
return Optional.of(Class.forName(className));
} catch (final ClassNotFoundException e) {
throw new IllegalStateException("Kafka Configuration Class not found", e);
return Optional.empty();
}
}
}

View File

@ -83,7 +83,7 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider {
final String loginConfig = LOGIN_CONFIG_PROVIDER.getConfiguration(context);
properties.put(SASL_JAAS_CONFIG.getProperty(), loginConfig);
final SaslMechanism saslMechanism = SaslMechanism.valueOf(context.getProperty(SASL_MECHANISM).getValue());
final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue());
if (SaslMechanism.GSSAPI == saslMechanism && isCustomKerberosLoginFound()) {
properties.put(SASL_LOGIN_CLASS.getProperty(), SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.shared.login;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class DelegatingLoginConfigProviderTest {
private static final String PLAIN_LOGIN_MODULE = "PlainLoginModule";
private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
DelegatingLoginConfigProvider provider;
TestRunner runner;
@BeforeEach
void setProvider() {
provider = new DelegatingLoginConfigProvider();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.setValidateExpressionUsage(false);
}
@Test
void testGetConfigurationPlain() {
runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.PLAIN.getValue());
final PropertyContext propertyContext = runner.getProcessContext();
final String configuration = provider.getConfiguration(propertyContext);
assertNotNull(configuration);
assertTrue(configuration.contains(PLAIN_LOGIN_MODULE), "PLAIN configuration not found");
}
@Test
void testGetConfigurationScram() {
runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_512.getValue());
final PropertyContext propertyContext = runner.getProcessContext();
final String configuration = provider.getConfiguration(propertyContext);
assertNotNull(configuration);
assertTrue(configuration.contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.shared.property.provider;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class StandardKafkaPropertyProviderTest {
private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
StandardKafkaPropertyProvider provider;
TestRunner runner;
@BeforeEach
void setProvider() {
provider = new StandardKafkaPropertyProvider(String.class);
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.setValidateExpressionUsage(false);
}
@Test
void testGetProperties() {
final SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
final PropertyContext propertyContext = runner.getProcessContext();
final Map<String, Object> properties = provider.getProperties(propertyContext);
assertEquals(securityProtocol.name(), properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName()));
}
@Test
void testGetPropertiesSaslMechanismScram() {
final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_256.getValue());
final PropertyContext propertyContext = runner.getProcessContext();
final Map<String, Object> properties = provider.getProperties(propertyContext);
final Object securityProtocolProperty = properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName());
assertEquals(securityProtocol.name(), securityProtocolProperty);
final Object saslConfigProperty = properties.get(KafkaClientProperty.SASL_JAAS_CONFIG.getProperty());
assertNotNull(saslConfigProperty, "SASL configuration not found");
assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
}
}