diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 8d5f2b4599..b0d4566790 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -139,6 +139,16 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final PropertyDescriptor KUDU_SASL_PROTOCOL_NAME = new Builder() + .name("kudu-sasl-protocol-name") + .displayName("Kudu SASL Protocol Name") + .description("The SASL protocol name to use for authenticating via Kerberos. Must match the service principal name.") + .required(false) + .defaultValue("kudu") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + private volatile KuduClient kuduClient; private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock(); private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock(); @@ -200,6 +210,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final String saslProtocolName = context.getProperty(KUDU_SASL_PROTOCOL_NAME).evaluateAttributeExpressions().getValue(); final int workerCount = context.getProperty(WORKER_COUNT).asInteger(); // Create Executor following approach of Executors.newCachedThreadPool() using worker count as maximum pool size @@ -217,6 +228,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return new KuduClient.KuduClientBuilder(masters) .defaultOperationTimeoutMs(operationTimeout) .defaultSocketReadTimeoutMs(adminOperationTimeout) + .saslProtocolName(saslProtocolName) .workerCount(workerCount) .nioExecutor(nioExecutor) .build(); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 49f0ca776c..ca273b61b3 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -300,6 +300,7 @@ public class PutKudu extends AbstractKuduProcessor { properties.add(KUDU_OPERATION_TIMEOUT_MS); properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS); properties.add(WORKER_COUNT); + properties.add(KUDU_SASL_PROTOCOL_NAME); return properties; } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index 7c2c84bd03..e9a7bdbcf1 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -68,6 +68,8 @@ public class ITPutKudu { new MiniKuduCluster.MiniKuduClusterBuilder() .addMasterServerFlag("--use_hybrid_clock=false") .addTabletServerFlag("--use_hybrid_clock=false") + .enableKerberos() + .principal("oryx") ); private TestRunner testRunner; @@ -99,6 +101,9 @@ public class ITPutKudu { testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "false"); testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT_IGNORE.toString()); + testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "test-user"); + testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "test-user"); + testRunner.setProperty(PutKudu.KUDU_SASL_PROTOCOL_NAME, "oryx"); } private void createKuduTable() throws KuduException { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml index 6ac0899465..4fbf6453f3 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml @@ -29,7 +29,7 @@ None - 1.14.0 + 1.15.0