diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index de289959ea..7ac330e46b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -94,7 +94,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -103,7 +103,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -112,7 +112,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") @@ -143,7 +143,7 @@ final class KafkaProcessorUtils { * security protocol, then Kerberos principal is provided as well */ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" @@ -152,8 +152,8 @@ final class KafkaProcessorUtils { .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); + String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) @@ -295,9 +295,9 @@ final class KafkaProcessorUtils { * @param context Context */ private static void setJaasConfig(Map mapToPopulate, ProcessContext context) { - String keytab = context.getProperty(USER_KEYTAB).getValue(); - String principal = context.getProperty(USER_PRINCIPAL).getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); + String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 3496ea0ca3..69dc7dee4b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -115,6 +115,14 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.assertValid(); + + runner.setVariable("keytab", "src/test/resources/server.properties"); + runner.setVariable("principal", "nifi@APACHE.COM"); + runner.setVariable("service", "kafka"); + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.assertValid(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java index 3da74e46df..569a8873b7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java @@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_10 { runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.assertValid(); + + runner.setVariable("keytab", "src/test/resources/server.properties"); + runner.setVariable("principal", "nifi@APACHE.COM"); + runner.setVariable("service", "kafka"); + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.assertValid(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index de289959ea..7ac330e46b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -94,7 +94,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -103,7 +103,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -112,7 +112,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") @@ -143,7 +143,7 @@ final class KafkaProcessorUtils { * security protocol, then Kerberos principal is provided as well */ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" @@ -152,8 +152,8 @@ final class KafkaProcessorUtils { .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); + String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) @@ -295,9 +295,9 @@ final class KafkaProcessorUtils { * @param context Context */ private static void setJaasConfig(Map mapToPopulate, ProcessContext context) { - String keytab = context.getProperty(USER_KEYTAB).getValue(); - String principal = context.getProperty(USER_PRINCIPAL).getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); + String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index b1edd1f718..4778f1adcc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -115,6 +115,14 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.assertValid(); + + runner.setVariable("keytab", "src/test/resources/server.properties"); + runner.setVariable("principal", "nifi@APACHE.COM"); + runner.setVariable("service", "kafka"); + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.assertValid(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java index 0f25759490..42a66b1c11 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java @@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_11 { runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.assertValid(); + + runner.setVariable("keytab", "src/test/resources/server.properties"); + runner.setVariable("principal", "nifi@APACHE.COM"); + runner.setVariable("service", "kafka"); + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.assertValid(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index de289959ea..7ac330e46b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -94,7 +94,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -103,7 +103,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -112,7 +112,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") @@ -143,7 +143,7 @@ final class KafkaProcessorUtils { * security protocol, then Kerberos principal is provided as well */ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" @@ -152,8 +152,8 @@ final class KafkaProcessorUtils { .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); + String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) @@ -295,9 +295,9 @@ final class KafkaProcessorUtils { * @param context Context */ private static void setJaasConfig(Map mapToPopulate, ProcessContext context) { - String keytab = context.getProperty(USER_KEYTAB).getValue(); - String principal = context.getProperty(USER_PRINCIPAL).getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); + String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 10ac3984f4..f6785f97ba 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -115,6 +115,14 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); runner.assertValid(); + + runner.setVariable("keytab", "src/test/resources/server.properties"); + runner.setVariable("principal", "nifi@APACHE.COM"); + runner.setVariable("service", "kafka"); + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.assertValid(); } }