diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 9636cf6d06..7561ee48c4 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -17,26 +17,29 @@ package org.apache.nifi.processors.kudu; -import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.Delete; +import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.OperationResponse; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowError; import org.apache.kudu.client.SessionConfiguration; -import org.apache.kudu.client.Delete; -import org.apache.kudu.client.Insert; -import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Update; +import org.apache.kudu.client.Upsert; +import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.AbstractProcessor; @@ -44,17 +47,21 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StringUtils; import javax.security.auth.login.LoginException; import java.math.BigDecimal; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; -import java.util.concurrent.TimeUnit; +import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; public abstract class AbstractKuduProcessor extends AbstractProcessor { @@ -74,6 +81,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .identifiesControllerService(KerberosCredentialsService.class) .build(); + static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() + .name("kerberos-principal") + .displayName("Kerberos Principal") + .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() + .name("kerberos-password") + .displayName("Kerberos Password") + .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder() .name("kudu-operations-timeout-ms") .displayName("Kudu Operation Timeout") @@ -109,12 +135,16 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { public void createKuduClient(ProcessContext context) throws LoginException { final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); if (credentialsService != null) { - final String keytab = credentialsService.getKeytab(); - final String principal = credentialsService.getPrincipal(); - kerberosUser = loginKerberosUser(principal, keytab); + kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab()); + } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { + kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword); + } + if (kerberosUser != null) { final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger()); this.kuduClient = kerberosAction.execute(); } else { @@ -146,12 +176,54 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } - protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { + protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException { final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab); kerberosUser.login(); return kerberosUser; } + protected KerberosUser loginKerberosPasswordUser(final String principal, final String password) throws LoginException { + final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password); + kerberosUser.login(); + return kerberosUser; + } + + @Override + protected Collection customValidate(ValidationContext context) { + final List results = new ArrayList<>(); + + final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue()); + final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue()); + + if (kerberosPrincipalProvided && !kerberosPasswordProvided) { + results.add(new ValidationResult.Builder() + .subject(KERBEROS_PASSWORD.getDisplayName()) + .valid(false) + .explanation("a password must be provided for the given principal") + .build()); + } + + if (kerberosPasswordProvided && !kerberosPrincipalProvided) { + results.add(new ValidationResult.Builder() + .subject(KERBEROS_PRINCIPAL.getDisplayName()) + .valid(false) + .explanation("a principal must be provided for the given password") + .build()); + } + + final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) { + results.add(new ValidationResult.Builder() + .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName()) + .valid(false) + .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time") + .build()); + } + + return results; + } + @OnStopped public void shutdown() throws Exception { try { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 3769932269..1893c64a93 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -231,6 +231,8 @@ public class PutKudu extends AbstractKuduProcessor { properties.add(KUDU_MASTERS); properties.add(TABLE_NAME); properties.add(KERBEROS_CREDENTIALS_SERVICE); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_PASSWORD); properties.add(SKIP_HEAD_LINE); properties.add(LOWERCASE_FIELD_NAMES); properties.add(HANDLE_SCHEMA_DRIFT); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index ad1071595e..e2935b0bff 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -117,7 +117,16 @@ public class MockPutKudu extends PutKudu { } @Override - protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { + protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException { + return createMockKerberosUser(principal); + } + + @Override + protected KerberosUser loginKerberosPasswordUser(String principal, String password) throws LoginException { + return createMockKerberosUser(principal); + } + + private KerberosUser createMockKerberosUser(final String principal) { return new KerberosUser() { @Override diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index b55695f4cc..303a03b00b 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -131,6 +131,32 @@ public class TestPutKudu { testRunner.enableControllerService(readerFactory); } + @Test + public void testCustomValidate() throws InitializationException { + createRecordReader(1); + + testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); + testRunner.assertNotValid(); + + testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); + testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); + testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); + testRunner.assertValid(); + + final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); + testRunner.addControllerService("kerb", kerberosCredentialsService); + testRunner.enableControllerService(kerberosCredentialsService); + testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); + testRunner.assertNotValid(); + + testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); + testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD); + testRunner.assertValid(); + } + @Test public void testWriteKuduWithDefaults() throws IOException, InitializationException { createRecordReader(100);