mirror of https://github.com/apache/nifi.git
NIFI-3528 Added support for keytab/principal to Kafka 0.10 processors
This closes #1606. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
556f309df0
commit
614fa6a6c4
|
@ -96,6 +96,24 @@ final class KafkaProcessorUtils {
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.build();
|
.build();
|
||||||
|
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||||
|
.name("sasl.kerberos.principal")
|
||||||
|
.displayName("Kerberos Principal")
|
||||||
|
.description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
|
||||||
|
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
|
||||||
|
.name("sasl.kerberos.keytab")
|
||||||
|
.displayName("Kerberos Keytab")
|
||||||
|
.description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
|
||||||
|
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.build();
|
||||||
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||||
.name("ssl.context.service")
|
.name("ssl.context.service")
|
||||||
.displayName("SSL Context Service")
|
.displayName("SSL Context Service")
|
||||||
|
@ -109,6 +127,8 @@ final class KafkaProcessorUtils {
|
||||||
BOOTSTRAP_SERVERS,
|
BOOTSTRAP_SERVERS,
|
||||||
SECURITY_PROTOCOL,
|
SECURITY_PROTOCOL,
|
||||||
KERBEROS_PRINCIPLE,
|
KERBEROS_PRINCIPLE,
|
||||||
|
USER_PRINCIPAL,
|
||||||
|
USER_KEYTAB,
|
||||||
SSL_CONTEXT_SERVICE
|
SSL_CONTEXT_SERVICE
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -131,6 +151,16 @@ final class KafkaProcessorUtils {
|
||||||
+ SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
|
+ SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
|
||||||
|
String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
|
||||||
|
if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
|
||||||
|
|| (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
|
||||||
|
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
|
||||||
|
.explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
|
||||||
|
+ "must be set.")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//If SSL or SASL_SSL then CS must be set.
|
//If SSL or SASL_SSL then CS must be set.
|
||||||
|
@ -233,7 +263,7 @@ final class KafkaProcessorUtils {
|
||||||
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
|
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
|
||||||
: context.getProperty(propertyDescriptor).getValue();
|
: context.getProperty(propertyDescriptor).getValue();
|
||||||
|
|
||||||
if (propertyValue != null) {
|
if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
|
||||||
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
|
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
|
||||||
// or the standard NiFi time period such as "5 secs"
|
// or the standard NiFi time period such as "5 secs"
|
||||||
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
|
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
|
||||||
|
@ -245,6 +275,38 @@ final class KafkaProcessorUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
|
||||||
|
if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
|
||||||
|
setJaasConfig(mapToPopulate, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
|
||||||
|
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
|
||||||
|
* <br />
|
||||||
|
* It expects something with the following format: <br />
|
||||||
|
* <br />
|
||||||
|
* <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>); <br />
|
||||||
|
* ControlFlag = required / requisite / sufficient / optional
|
||||||
|
*
|
||||||
|
* @param mapToPopulate Map of configuration properties
|
||||||
|
* @param context Context
|
||||||
|
*/
|
||||||
|
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
|
||||||
|
String keytab = context.getProperty(USER_KEYTAB).getValue();
|
||||||
|
String principal = context.getProperty(USER_PRINCIPAL).getValue();
|
||||||
|
String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
|
||||||
|
if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
|
||||||
|
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
|
||||||
|
+ "useTicketCache=false "
|
||||||
|
+ "renewTicket=true "
|
||||||
|
+ "serviceName=\"" + serviceName + "\" "
|
||||||
|
+ "useKeyTab=true "
|
||||||
|
+ "keyTab=\"" + keytab + "\" "
|
||||||
|
+ "principal=\"" + principal + "\";");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
|
private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
|
||||||
|
|
|
@ -164,4 +164,29 @@ public class ConsumeKafkaTest {
|
||||||
verifyNoMoreInteractions(mockLease);
|
verifyNoMoreInteractions(mockLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJaasConfiguration() throws Exception {
|
||||||
|
ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
|
||||||
|
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
|
||||||
|
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
|
||||||
|
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
|
||||||
|
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue