mirror of https://github.com/apache/nifi.git
NIFI-7278 Adding support for SCRAM-SHA-512 to Kafka 2.0 processors
This commit is contained in:
parent
4d84c144d2
commit
5fd25d6235
|
@ -86,7 +86,11 @@ public final class KafkaProcessorUtils {
|
||||||
"be populated when using this mechanism.");
|
"be populated when using this mechanism.");
|
||||||
|
|
||||||
static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
|
static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
|
||||||
static final AllowableValue SASL_MECHANISM_SCRAM = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism. " +
|
static final AllowableValue SASL_MECHANISM_SCRAM_SHA256 = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-256. " +
|
||||||
|
"The username and password properties must be set when using this mechanism.");
|
||||||
|
|
||||||
|
static final String SCRAM_SHA512_VALUE = "SCRAM-SHA-512";
|
||||||
|
static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
|
||||||
"The username and password properties must be set when using this mechanism.");
|
"The username and password properties must be set when using this mechanism.");
|
||||||
|
|
||||||
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
|
||||||
|
@ -113,7 +117,7 @@ public final class KafkaProcessorUtils {
|
||||||
.description("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
|
.description("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||||
.allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM)
|
.allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM_SHA256, SASL_MECHANISM_SCRAM_SHA512)
|
||||||
.defaultValue(GSSAPI_VALUE)
|
.defaultValue(GSSAPI_VALUE)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
|
||||||
|
@ -148,7 +152,7 @@ public final class KafkaProcessorUtils {
|
||||||
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
|
||||||
.name("sasl.username")
|
.name("sasl.username")
|
||||||
.displayName("Username")
|
.displayName("Username")
|
||||||
.description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
|
.description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
@ -156,7 +160,7 @@ public final class KafkaProcessorUtils {
|
||||||
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||||
.name("sasl.password")
|
.name("sasl.password")
|
||||||
.displayName("Password")
|
.displayName("Password")
|
||||||
.description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
|
.description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
|
||||||
.required(false)
|
.required(false)
|
||||||
.sensitive(true)
|
.sensitive(true)
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
@ -165,7 +169,8 @@ public final class KafkaProcessorUtils {
|
||||||
static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder()
|
||||||
.name("sasl.token.auth")
|
.name("sasl.token.auth")
|
||||||
.displayName("Token Auth")
|
.displayName("Token Auth")
|
||||||
.description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + ", this property indicates if token authentication should be used.")
|
.description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + " or " + SCRAM_SHA512_VALUE
|
||||||
|
+ ", this property indicates if token authentication should be used.")
|
||||||
.required(false)
|
.required(false)
|
||||||
.allowableValues("true", "false")
|
.allowableValues("true", "false")
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
|
@ -263,14 +268,16 @@ public final class KafkaProcessorUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate that if SASL Mechanism is PLAIN or SCRAM, then username and password are both provided
|
// validate that if SASL Mechanism is PLAIN or SCRAM, then username and password are both provided
|
||||||
if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) || SASL_MECHANISM_SCRAM.getValue().equals(saslMechanism)) {
|
if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism)
|
||||||
|
|| SASL_MECHANISM_SCRAM_SHA256.getValue().equals(saslMechanism)
|
||||||
|
|| SASL_MECHANISM_SCRAM_SHA512.getValue().equals(saslMechanism)) {
|
||||||
final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
|
final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
|
||||||
if (StringUtils.isBlank(username)) {
|
if (StringUtils.isBlank(username)) {
|
||||||
results.add(new ValidationResult.Builder()
|
results.add(new ValidationResult.Builder()
|
||||||
.subject(USERNAME.getDisplayName())
|
.subject(USERNAME.getDisplayName())
|
||||||
.valid(false)
|
.valid(false)
|
||||||
.explanation("A username is required when " + SASL_MECHANISM.getDisplayName()
|
.explanation("A username is required when " + SASL_MECHANISM.getDisplayName()
|
||||||
+ " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
|
+ " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +287,7 @@ public final class KafkaProcessorUtils {
|
||||||
.subject(PASSWORD.getDisplayName())
|
.subject(PASSWORD.getDisplayName())
|
||||||
.valid(false)
|
.valid(false)
|
||||||
.explanation("A password is required when " + SASL_MECHANISM.getDisplayName()
|
.explanation("A password is required when " + SASL_MECHANISM.getDisplayName()
|
||||||
+ " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
|
+ " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -443,6 +450,7 @@ public final class KafkaProcessorUtils {
|
||||||
setPlainJaasConfig(mapToPopulate, context);
|
setPlainJaasConfig(mapToPopulate, context);
|
||||||
break;
|
break;
|
||||||
case SCRAM_SHA256_VALUE:
|
case SCRAM_SHA256_VALUE:
|
||||||
|
case SCRAM_SHA512_VALUE:
|
||||||
setScramJaasConfig(mapToPopulate, context);
|
setScramJaasConfig(mapToPopulate, context);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -236,7 +236,7 @@ public class TestConsumeKafkaRecord_2_0 {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJaasConfigurationWithScramMechanism() {
|
public void testJaasConfigurationWithScram256Mechanism() {
|
||||||
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
|
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
|
||||||
runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
|
runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
|
||||||
runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
|
runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
|
||||||
|
@ -257,6 +257,28 @@ public class TestConsumeKafkaRecord_2_0 {
|
||||||
runner.assertNotValid();
|
runner.assertNotValid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJaasConfigurationWithScram512Mechanism() {
|
||||||
|
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
|
||||||
|
runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
|
||||||
|
runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA512_VALUE);
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.removeProperty(KafkaProcessorUtils.USERNAME);
|
||||||
|
runner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNonSaslSecurityProtocol() {
|
public void testNonSaslSecurityProtocol() {
|
||||||
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
|
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
|
||||||
|
|
Loading…
Reference in New Issue