NIFI-4515 - This closes #2224. Enabled EL on Kerberos properties for Kafka 0.10 & 0.11 & 1.0 processors

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Pierre Villard 2017-10-23 14:34:18 +02:00 committed by joewitt
parent 6e7544bd37
commit 10e3b14433
8 changed files with 67 additions and 27 deletions

View File

@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
+ "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal") .name("sasl.kerberos.principal")
@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab") .name("sasl.kerberos.keytab")
@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service") .name("ssl.context.service")
@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
* security protocol, then Kerberos principal is provided as well * security protocol, then Kerberos principal is provided as well
*/ */
if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
.explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
.build()); .build());
} }
String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
|| (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
* @param context Context * @param context Context
*/ */
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
String keytab = context.getProperty(USER_KEYTAB).getValue(); String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String principal = context.getProperty(USER_PRINCIPAL).getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false " + "useTicketCache=false "

View File

@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid(); runner.assertValid();
runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
runner.assertValid();
} }
} }

View File

@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_10 {
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid(); runner.assertValid();
runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
runner.assertValid();
} }
} }

View File

@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
+ "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal") .name("sasl.kerberos.principal")
@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab") .name("sasl.kerberos.keytab")
@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service") .name("ssl.context.service")
@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
* security protocol, then Kerberos principal is provided as well * security protocol, then Kerberos principal is provided as well
*/ */
if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
.explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
.build()); .build());
} }
String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
|| (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
* @param context Context * @param context Context
*/ */
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
String keytab = context.getProperty(USER_KEYTAB).getValue(); String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String principal = context.getProperty(USER_PRINCIPAL).getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false " + "useTicketCache=false "

View File

@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid(); runner.assertValid();
runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
runner.assertValid();
} }
} }

View File

@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_11 {
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid(); runner.assertValid();
runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
runner.assertValid();
} }
} }

View File

@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
+ "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal") .name("sasl.kerberos.principal")
@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab") .name("sasl.kerberos.keytab")
@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false) .required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service") .name("ssl.context.service")
@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
* security protocol, then Kerberos principal is provided as well * security protocol, then Kerberos principal is provided as well
*/ */
if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
.explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
.build()); .build());
} }
String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
|| (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
* @param context Context * @param context Context
*/ */
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
String keytab = context.getProperty(USER_KEYTAB).getValue(); String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String principal = context.getProperty(USER_PRINCIPAL).getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false " + "useTicketCache=false "

View File

@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid(); runner.assertValid();
runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
runner.assertValid();
} }
} }