diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index cbe2e24b16..de289959ea 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -96,6 +96,24 @@ final class KafkaProcessorUtils {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false)
.build();
+ static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.principal")
+ .displayName("Kerberos Principal")
+ .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.keytab")
+ .displayName("Kerberos Keytab")
+ .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
@@ -109,6 +127,8 @@ final class KafkaProcessorUtils {
BOOTSTRAP_SERVERS,
SECURITY_PROTOCOL,
KERBEROS_PRINCIPLE,
+ USER_PRINCIPAL,
+ USER_KEYTAB,
SSL_CONTEXT_SERVICE
);
}
@@ -131,6 +151,16 @@ final class KafkaProcessorUtils {
+ SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
.build());
}
+
+ String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
+ String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
+ if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
+ || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
+ results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+ .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+ + "must be set.")
+ .build());
+ }
}
//If SSL or SASL_SSL then CS must be set.
@@ -233,7 +263,7 @@ final class KafkaProcessorUtils {
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
- if (propertyValue != null) {
+ if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
@@ -245,6 +275,38 @@ final class KafkaProcessorUtils {
}
}
}
+
+ String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+ setJaasConfig(mapToPopulate, context);
+ }
+ }
+
+ /**
+ * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients
+ *
+ * It expects something with the following format:
+ *
+ * <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>);
+ * ControlFlag = required / requisite / sufficient / optional
+ *
+ * @param mapToPopulate Map of configuration properties
+ * @param context Context
+ */
+ private static void setJaasConfig(Map mapToPopulate, ProcessContext context) {
+ String keytab = context.getProperty(USER_KEYTAB).getValue();
+ String principal = context.getProperty(USER_PRINCIPAL).getValue();
+ String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
+ if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+ mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ + "useTicketCache=false "
+ + "renewTicket=true "
+ + "serviceName=\"" + serviceName + "\" "
+ + "useKeyTab=true "
+ + "keyTab=\"" + keytab + "\" "
+ + "principal=\"" + principal + "\";");
+ }
}
private static boolean isStaticStringFieldNamePresent(final String name, final Class>... classes) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 9a74c44746..9b380d568a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -164,4 +164,29 @@ public class ConsumeKafkaTest {
verifyNoMoreInteractions(mockLease);
}
+ @Test
+ public void testJaasConfiguration() throws Exception {
+ ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+ runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+ runner.assertValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+ runner.assertValid();
+ }
+
}