From 167853163884a20c27cd44b38679918220774411 Mon Sep 17 00:00:00 2001 From: jstorck Date: Fri, 28 Feb 2020 01:18:06 -0500 Subject: [PATCH] NIFI-7025: Initial commit adding Kerberos Password feature for Hive components Kerberos Password property should not support EL, this includes a change to KerberosProperties which is also used by the HDFS processors (AbstractHadoopProcessor) Added wiring in a KerberosContext to a TestRunner's MockProcessorInitializationContext Removed synchronization blocks around KerberosUser.checkTGTAndRelogin, since that method is already synchronized Updated AbstractHadoopProcessor to have a boolean accessor method to determine if explicit keytab configuration is allowed Removed synchronization block from HiveConnectionPool's getConnection method (in Hive, Hive_1_1, Hive3 modules), since new TGT ticket acquisition is handled by the KerberosUser implementation. If UGI is used to relogin, synchronization is handled internally by UGI. Added Kerberos Principal and Kerberos Password properties to Hive, Hive_1_1, and Hive3 components Hive, Hive_1_1, and Hive3 components now use KerberosUser implementations to authenticate with a KDC Updated handling of the NIFI_ALLOW_EXPLICIT_KEYTAB environment variable in Hive and Hive3 components. An accessor method has been added that uses Boolean.parseBoolean, which returns true if the environment variable is set to true, and false otherwise (including when the environment variable is unset). Addressing PR feedback Addressing PR feedback This closes #4102. --- .../security/krb/AbstractKerberosUser.java | 4 +- .../security/krb/KeytabConfiguration.java | 4 + .../MockProcessorInitializationContext.java | 15 ++- .../util/StandardProcessorTestRunner.java | 15 ++- .../org/apache/nifi/util/TestRunners.java | 22 +++++ .../nifi/hadoop/KerberosProperties.java | 3 +- .../org/apache/nifi/hadoop/SecurityUtil.java | 9 ++ .../hadoop/AbstractHadoopProcessor.java | 29 +++--- .../hadoop/AbstractHadoopProcessorSpec.groovy | 30 +++--- .../hadoop/SimpleHadoopProcessor.java | 10 ++ .../nifi/dbcp/hive/HiveConnectionPool.java | 83 +++++++++++----- .../processors/hive/PutHiveStreaming.java | 65 ++++++++++--- .../nifi/util/hive/HiveConfigurator.java | 24 ++++- .../processors/hive/TestPutHiveStreaming.java | 24 ++++- .../nifi/dbcp/hive/Hive3ConnectionPool.java | 95 ++++++++++++------ .../processors/hive/PutHive3Streaming.java | 63 ++++++++++-- .../nifi/util/hive/HiveConfigurator.java | 24 ++++- .../hive/TestPutHive3Streaming.java | 10 +- .../dbcp/hive/Hive_1_1ConnectionPool.java | 97 +++++++++++++++---- .../nifi/util/hive/HiveConfigurator.java | 24 ++++- 20 files changed, 504 insertions(+), 146 deletions(-) diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java index 6764497ce9..5278618bb4 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java @@ -82,7 +82,9 @@ public abstract class AbstractKerberosUser implements KerberosUser { loggedIn.set(true); LOGGER.debug("Successful login for {}", new Object[]{principal}); } catch (LoginException le) { - throw new LoginException("Unable to login with " + principal + " due to: " + le.getMessage()); + LoginException loginException = new LoginException("Unable to login with " + principal + " due to: " + le.getMessage()); + loginException.setStackTrace(le.getStackTrace()); + throw loginException; } } diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java index 24af9b207e..a038c1d03a 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java @@ -17,6 +17,8 @@ package org.apache.nifi.security.krb; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; @@ -27,6 +29,7 @@ import java.util.Map; * Custom JAAS Configuration object for a provided principal and keytab. */ public class KeytabConfiguration extends Configuration { + private static final Logger LOGGER = LoggerFactory.getLogger(KeytabConfiguration.class); private final String principal; private final String keytabFile; @@ -63,6 +66,7 @@ public class KeytabConfiguration extends Configuration { final String krbLoginModuleName = ConfigurationUtil.IS_IBM ? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE; + LOGGER.debug("krbLoginModuleName: {}, configuration options: {}", krbLoginModuleName, options); this.kerberosKeytabConfigEntry = new AppConfigurationEntry( krbLoginModuleName, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options); } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 6a26371a7d..d48fc3de23 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -23,6 +23,7 @@ import java.util.UUID; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -31,15 +32,21 @@ public class MockProcessorInitializationContext implements ProcessorInitializati private final MockComponentLog logger; private final String processorId; private final MockProcessContext context; + private final KerberosContext kerberosContext; public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context) { - this(processor, context, null); + this(processor, context, null, null); } public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger) { + this(processor, context, logger, null); + } + + public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger, KerberosContext kerberosContext) { processorId = UUID.randomUUID().toString(); this.logger = logger == null ? new MockComponentLog(processorId, processor) : logger; this.context = context; + this.kerberosContext = kerberosContext; } @Override @@ -94,16 +101,16 @@ public class MockProcessorInitializationContext implements ProcessorInitializati @Override public String getKerberosServicePrincipal() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosServicePrincipal() : null; } @Override public File getKerberosServiceKeytab() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosServiceKeytab() : null; } @Override public File getKerberosConfigurationFile() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosConfigurationFile() : null; } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index d995f8ec38..40010c6b07 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -64,6 +64,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; @@ -77,6 +78,7 @@ public class StandardProcessorTestRunner implements TestRunner { private final Processor processor; private final MockProcessContext context; + private final KerberosContext kerberosContext; private final MockFlowFileQueue flowFileQueue; private final SharedSessionState sharedState; private final AtomicLong idGenerator; @@ -99,10 +101,18 @@ public class StandardProcessorTestRunner implements TestRunner { } StandardProcessorTestRunner(final Processor processor, String processorName) { - this(processor, processorName, null); + this(processor, processorName, null, null); + } + + StandardProcessorTestRunner(final Processor processor, String processorName, KerberosContext kerberosContext) { + this(processor, processorName, null, kerberosContext); } StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger) { + this(processor, processorName, logger, null); + } + + StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger, KerberosContext kerberosContext) { this.processor = processor; this.idGenerator = new AtomicLong(0L); this.sharedState = new SharedSessionState(processor, idGenerator); @@ -111,8 +121,9 @@ public class StandardProcessorTestRunner implements TestRunner { this.processorStateManager = new MockStateManager(processor); this.variableRegistry = new MockVariableRegistry(); this.context = new MockProcessContext(processor, processorName, processorStateManager, variableRegistry); + this.kerberosContext = kerberosContext; - final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger); + final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger, kerberosContext); processor.initialize(mockInitContext); this.logger = mockInitContext.getLogger(); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java index 97924738aa..8370e88b8a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.processor.Processor; public class TestRunners { @@ -30,6 +31,16 @@ public class TestRunners { return newTestRunner(processor,processor.getClass().getName()); } + /** + * Returns a {@code TestRunner} for the given {@code Processor} which uses the given {@code KerberosContext}. + * @param processor the {@code Processor} under test + * @param kerberosContext the {@code KerberosContext} used during the test + * @return + */ + public static TestRunner newTestRunner(final Processor processor, KerberosContext kerberosContext) { + return newTestRunner(processor,processor.getClass().getName(), kerberosContext); + } + /** * Returns a {@code TestRunner} for the given {@code Processor}. * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()} @@ -52,6 +63,17 @@ public class TestRunners { return new StandardProcessorTestRunner(processor, name); } + /** + * Returns a {@code TestRunner} for the given {@code Processor} and {@code KerberosContext}. + * @param processor the {@code Processor} under test + * @param name the name to give the {@code Processor} + * @param kerberosContext the {@code KerberosContext} used during the test + * @return a {@code TestRunner} + */ + public static TestRunner newTestRunner(final Processor processor, String name, KerberosContext kerberosContext) { + return new StandardProcessorTestRunner(processor, name, kerberosContext); + } + /** * Returns a {@code TestRunner} for the given {@code Processor}. * The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name. diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java index ba5e9ecd3f..5977b6cefc 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -104,8 +104,7 @@ public class KerberosProperties { .name("Kerberos Password") .required(false) .description("Kerberos password associated with the principal.") - .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .sensitive(true) .build(); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java index 6a8f079960..eeabaf095e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java @@ -78,6 +78,15 @@ public class SecurityUtil { return UserGroupInformation.getCurrentUser(); } + /** + * Authenticates a {@link KerberosUser} and acquires a {@link UserGroupInformation} instance using {@link UserGroupInformation#getUGIFromSubject(Subject)}. + * The {@link UserGroupInformation} will use the given {@link Configuration}. + * + * @param config The Configuration to apply to the acquired UserGroupInformation instance + * @param kerberosUser The KerberosUser to authenticate + * @return A UserGroupInformation instance created using the Subject of the given KerberosUser + * @throws IOException if authentication fails + */ public static synchronized UserGroupInformation getUgiForKerberosUser(final Configuration config, final KerberosUser kerberosUser) throws IOException { UserGroupInformation.setConfiguration(config); try { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 69ea716019..7a59170a1a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -184,12 +184,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); - final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - final String resolvedPassword; if (credentialsService == null) { resolvedPrincipal = explicitPrincipal; resolvedKeytab = explicitKeytab; @@ -197,7 +196,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); } - resolvedPassword = explicitPassword; final List results = new ArrayList<>(); @@ -220,7 +218,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { final Configuration conf = resources.getConfiguration(); results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword( - this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, resolvedPassword, getLogger())); + this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger())); } catch (final IOException e) { results.add(new ValidationResult.Builder() @@ -238,8 +236,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .build()); } - final String allowExplicitKeytabVariable = getAllowExplicitKeytabEnvironmentVariable(); - if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && explicitKeytab != null) { + if (!isAllowExplicitKeytab() && explicitKeytab != null) { results.add(new ValidationResult.Builder() .subject("Kerberos Credentials") .valid(false) @@ -390,7 +387,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { if (SecurityUtil.isSecurityEnabled(config)) { String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); - String password = context.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue(); + String password = context.getProperty(kerberosProperties.getKerberosPassword()).getValue(); // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. @@ -545,15 +542,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { if (hdfsResources.get().getKerberosUser() != null) { // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring KerberosUser kerberosUser = hdfsResources.get().getKerberosUser(); - synchronized (kerberosUser) { - getLogger().debug("kerberosUser is " + kerberosUser); - try { - getLogger().debug("checking TGT on kerberosUser [{}]" + kerberosUser); - kerberosUser.checkTGTAndRelogin(); - } catch (LoginException e) { - throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser " + kerberosUser); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); } - } } else { getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); } @@ -563,8 +558,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { /* * Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables. */ - String getAllowExplicitKeytabEnvironmentVariable() { - return System.getenv(ALLOW_EXPLICIT_KEYTAB); + boolean isAllowExplicitKeytab() { + return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB)); } static protected class HdfsResources { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy index 9b4faf560f..49a835fab8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy @@ -74,22 +74,22 @@ class AbstractHadoopProcessorSpec extends Specification { actualValidationErrors.size() == expectedValidationErrorCount where: - testName | configuredPrincipal | configuredKeytab | configuredPassword | allowExplicitKeytab | configuredKeytabCredentialsService | configuredKeytabCredentialsServicePrincipal | configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount - "success case 1" | "principal" | "keytab" | null | "true" | false | null | null || 0 - "success case 2" | "principal" | null | "password" | "true" | false | null | null || 0 - "success case 3" | "principal" | null | "password" | "false" | false | null | null || 0 - "success case 4" | null | null | null | "true" | true | "principal" | "keytab" || 0 - "success case 5" | null | null | null | "false" | true | "principal" | "keytab" || 0 + testName | configuredPrincipal | configuredKeytab | configuredPassword | allowExplicitKeytab | configuredKeytabCredentialsService | configuredKeytabCredentialsServicePrincipal | configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount + "success case 1" | "principal" | "keytab" | null | true | false | null | null || 0 + "success case 2" | "principal" | null | "password" | true | false | null | null || 0 + "success case 3" | "principal" | null | "password" | false | false | null | null || 0 + "success case 4" | null | null | null | true | true | "principal" | "keytab" || 0 + "success case 5" | null | null | null | false | true | "principal" | "keytab" || 0 // do not allow explicit keytab, but provide one anyway; validation fails - "failure case 1" | "principal" | "keytab" | null | "false" | false | null | null || 1 - "failure case 2" | null | "keytab" | null | "false" | false | null | null || 2 + "failure case 1" | "principal" | "keytab" | null | false | false | null | null || 1 + "failure case 2" | null | "keytab" | null | false | false | null | null || 2 // keytab credentials service is provided, but explicit properties for principal, password, or keytab are also provided; validation fails - "failure case 3" | "principal" | null | null | "true" | true | "principal" | "keytab" || 1 - "failure case 4" | null | "keytab" | null | "true" | true | "principal" | "keytab" || 1 - "failure case 5" | null | null | "password" | "true" | true | "principal" | "keytab" || 2 - "failure case 6" | "principal" | null | null | "false" | true | "principal" | "keytab" || 1 - "failure case 7" | null | "keytab" | null | "false" | true | "principal" | "keytab" || 2 - "failure case 8" | null | null | "password" | "false" | true | "principal" | "keytab" || 2 + "failure case 3" | "principal" | null | null | true | true | "principal" | "keytab" || 1 + "failure case 4" | null | "keytab" | null | true | true | "principal" | "keytab" || 1 + "failure case 5" | null | null | "password" | true | true | "principal" | "keytab" || 2 + "failure case 6" | "principal" | null | null | false | true | "principal" | "keytab" || 1 + "failure case 7" | null | "keytab" | null | false | true | "principal" | "keytab" || 2 + "failure case 8" | null | null | "password" | false | true | "principal" | "keytab" || 2 } private class TestAbstractHadoopProcessor extends AbstractHadoopProcessor { @@ -105,7 +105,7 @@ class AbstractHadoopProcessorSpec extends Specification { } @Override - String getAllowExplicitKeytabEnvironmentVariable() { + boolean isAllowExplicitKeytab() { allowExplicitKeytab } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java index 4aa8e54678..f8622da66a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java @@ -26,9 +26,15 @@ import java.io.File; public class SimpleHadoopProcessor extends AbstractHadoopProcessor { private KerberosProperties testKerberosProperties; + private boolean allowExplicitKeytab; public SimpleHadoopProcessor(KerberosProperties kerberosProperties) { + this(kerberosProperties, true); + } + + public SimpleHadoopProcessor(KerberosProperties kerberosProperties, boolean allowExplicitKeytab) { this.testKerberosProperties = kerberosProperties; + this.allowExplicitKeytab = allowExplicitKeytab; } @Override @@ -40,4 +46,8 @@ public class SimpleHadoopProcessor extends AbstractHadoopProcessor { return testKerberosProperties; } + @Override + boolean isAllowExplicitKeytab() { + return allowExplicitKeytab; + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java index 378799e104..f84af6f630 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java @@ -39,6 +39,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.util.hive.AuthenticationFailedException; import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveUtils; @@ -59,6 +62,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.expression.ExpressionLanguageScope; +import javax.security.auth.login.LoginException; + /** * Implementation for Database Connection Pooling Service used for Apache Hive * connections. Apache DBCP is used for connection pooling functionality. @@ -165,6 +170,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); private volatile UserGroupInformation ugi; + private final AtomicReference kerberosUserReference = new AtomicReference<>(); private volatile File kerberosConfigFile = null; private volatile KerberosProperties kerberosProperties; @@ -184,6 +190,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv kerberosProperties = new KerberosProperties(kerberosConfigFile); props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); + props.add(kerberosProperties.getKerberosPassword()); properties = props; } @@ -201,37 +208,37 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv if (confFileProvided) { final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = explicitPrincipal; - resolvedKeytab = explicitKeytab; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; } final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger())); - if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) { problems.add(new ValidationResult.Builder() .subject("Kerberos Credentials") .valid(false) - .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password") .build()); } - final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); - if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + if (!isAllowExplicitKeytab() && explicitKeytab != null) { problems.add(new ValidationResult.Builder() .subject("Kerberos Credentials") .valid(false) - .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " - + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") .build()); } } @@ -293,28 +300,37 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv if (SecurityUtil.isSecurityEnabled(hiveConfig)) { final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = explicitPrincipal; - resolvedKeytab = explicitKeytab; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + if (resolvedKeytab != null) { + kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } try { - ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); } catch (AuthenticationFailedException ae) { log.error(ae.getMessage(), ae); throw new InitializationException(ae); } - getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + getLogger().info("Successfully logged in as principal " + resolvedPrincipal); } final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); @@ -356,13 +372,24 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv public Connection getConnection() throws ProcessException { try { if (ugi != null) { - synchronized(this) { - /* - * Make sure that only one thread can request that the UGI relogin at a time. This - * explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling - * the acquisition of a new TGT after the current one has expired. - * https://issues.apache.org/jira/browse/NIFI-5134 - */ + /* + * Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization + * is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized. + */ + getLogger().trace("getting UGI instance"); + if (kerberosUserReference.get() != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + KerberosUser kerberosUser = kerberosUserReference.get(); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser}); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + // no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally ugi.checkTGTAndReloginFromKeytab(); } try { @@ -395,4 +422,10 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv return connectionUrl; } + /* + * Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables. + */ + boolean isAllowExplicitKeytab() { + return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB)); + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 16271594b8..0ef4211c08 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -62,6 +62,9 @@ import org.apache.nifi.processor.util.pattern.ErrorTypes; import org.apache.nifi.processor.util.pattern.ExceptionHandler; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processor.util.pattern.RoutingResult; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.util.hive.AuthenticationFailedException; import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveOptions; @@ -70,6 +73,7 @@ import org.apache.nifi.util.hive.HiveWriter; import org.xerial.snappy.Snappy; import org.apache.nifi.util.hive.ValidationResources; +import javax.security.auth.login.LoginException; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -318,6 +322,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); protected volatile UserGroupInformation ugi; + final protected AtomicReference kerberosUserReference = new AtomicReference<>(); protected volatile HiveConf hiveConfig; protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false); @@ -353,6 +358,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { kerberosProperties = new KerberosProperties(kerberosConfigFile); props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); + props.add(kerberosProperties.getKerberosPassword()); propertyDescriptors = Collections.unmodifiableList(props); Set _relationships = new HashSet<>(); @@ -382,6 +388,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { if (confFileProvided) { final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; @@ -396,23 +403,22 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger())); - if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) { problems.add(new ValidationResult.Builder() .subject("Kerberos Credentials") .valid(false) - .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password") .build()); } - final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); - if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + if (!isAllowExplicitKeytab() && explicitKeytab != null) { problems.add(new ValidationResult.Builder() .subject("Kerberos Credentials") .valid(false) - .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " - + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") .build()); } } @@ -446,6 +452,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { if (SecurityUtil.isSecurityEnabled(hiveConfig)) { final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; @@ -458,16 +465,26 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { resolvedKeytab = credentialsService.getKeytab(); } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + if (resolvedKeytab != null) { + kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } + try { - ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); } catch (AuthenticationFailedException ae) { throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); } - log.info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + log.info("Successfully logged in as principal " + resolvedPrincipal); } else { ugi = null; + kerberosUserReference.set(null); } callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds @@ -967,6 +984,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } ugi = null; + kerberosUserReference.set(null); } private void setupHeartBeatTimer(int heartbeatInterval) { @@ -1048,7 +1066,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { HiveWriter writer = writers.get(endPoint); if (writer == null) { log.debug("Creating Writer to Hive end point : " + endPoint); - writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options); + writer = makeHiveWriter(endPoint, callTimeoutPool, getUgi(), options); if (writers.size() > (options.getMaxOpenConnections() - 1)) { log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{writers.size(), options.getMaxOpenConnections()}); int retired = retireIdleWriters(writers, options.getIdleTimeout()); @@ -1144,6 +1162,31 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { return kerberosProperties; } + UserGroupInformation getUgi() { + getLogger().trace("getting UGI instance"); + if (kerberosUserReference.get() != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + KerberosUser kerberosUser = kerberosUserReference.get(); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser}); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + } + return ugi; + } + + /* + * Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables. + */ + boolean isAllowExplicitKeytab() { + return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB)); + } + protected class HiveStreamingRecord { private List partitionValues; diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index 5fed81adaa..b212207ebb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.security.krb.KerberosUser; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference; public class HiveConfigurator { - public Collection validate(String configFiles, String principal, String keyTab, AtomicReference validationResourceHolder, ComponentLog log) { + public Collection validate(String configFiles, String principal, String keyTab, String password, + AtomicReference validationResourceHolder, ComponentLog log) { final List problems = new ArrayList<>(); ValidationResources resources = validationResourceHolder.get(); @@ -50,7 +52,7 @@ public class HiveConfigurator { final Configuration hiveConfig = resources.getConfiguration(); - problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log)); + problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log)); return problems; } @@ -74,6 +76,22 @@ public class HiveConfigurator { } } + /** + * Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}. + * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser) + * @param hiveConfig The Configuration to apply to the acquired UserGroupInformation + * @param kerberosUser The KerberosUser to authenticate + * @return A UserGroupInformation instance created using the Subject of the given KerberosUser + * @throws AuthenticationFailedException if authentication fails + */ + public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException { + try { + return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser); + } catch (IOException ioe) { + throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe); + } + } + /** * As of Apache NiFi 1.5.0, due to changes made to * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this @@ -91,7 +109,9 @@ public class HiveConfigurator { * authentication attempts that would leave the Hive controller service in an unrecoverable state. * * @see SecurityUtil#loginKerberos(Configuration, String, String) + * @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)} */ + @Deprecated public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException { UserGroupInformation ugi; try { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index ed3461d947..3681755669 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -34,7 +34,10 @@ import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -67,6 +70,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -103,7 +107,11 @@ public class TestPutHiveStreaming { when(hiveConfigurator.getConfigurationFromFiles(AdditionalMatchers.or(anyString(), isNull()))).thenReturn(hiveConf); processor.hiveConfigurator = hiveConfigurator; processor.setKerberosProperties(kerberosPropsWithFile); - runner = TestRunners.newTestRunner(processor); + KerberosContext mockKerberosContext = mock(KerberosContext.class); + when(mockKerberosContext.getKerberosConfigurationFile()).thenReturn(kerberosPropsWithFile.getKerberosConfigFile()); + when(mockKerberosContext.getKerberosServiceKeytab()).thenReturn(null); + when(mockKerberosContext.getKerberosServicePrincipal()).thenReturn(null); + runner = TestRunners.newTestRunner(processor, mockKerberosContext); } @Test @@ -118,23 +126,27 @@ public class TestPutHiveStreaming { } @Test - public void testUgiGetsCleared() { + public void testUgiAndKerberosUserGetsCleared() { runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); processor.ugi = mock(UserGroupInformation.class); + processor.kerberosUserReference.set(new KerberosPasswordUser("user", "password")); runner.run(); assertNull(processor.ugi); + assertNull(processor.kerberosUserReference.get()); } @Test public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException { when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS); ugi = mock(UserGroupInformation.class); - when(hiveConfigurator.authenticate(eq(hiveConf), AdditionalMatchers.or(anyString(), isNull()), AdditionalMatchers.or(anyString(), isNull()))).thenReturn(ugi); + when(hiveConfigurator.authenticate(eq(hiveConf), any(KerberosUser.class))).thenReturn(ugi); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(), "principal"); Map user1 = new HashMap() { { put("name", "Joe"); @@ -163,7 +175,7 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.HIVE_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml, src/test/resources/hive-site-security.xml"); runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); - runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab"); runner.run(); } @@ -1037,6 +1049,10 @@ public class TestPutHiveStreaming { this.generateExceptionOnFlushAndClose = generateExceptionOnFlushAndClose; } + @Override + UserGroupInformation getUgi() { + return ugi; + } } private class MockHiveWriter extends HiveWriter { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java index 01597fab4b..0a49a401c7 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java @@ -43,6 +43,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.util.hive.AuthenticationFailedException; import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveUtils; @@ -62,6 +65,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.controller.ControllerServiceInitializationContext; +import javax.security.auth.login.LoginException; + /** * Implementation for Database Connection Pooling Service used for Apache Hive * connections. Apache DBCP is used for connection pooling functionality. @@ -264,6 +269,7 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); private volatile UserGroupInformation ugi; + private final AtomicReference kerberosUserReference = new AtomicReference<>(); private volatile File kerberosConfigFile = null; private volatile KerberosProperties kerberosProperties; @@ -289,6 +295,7 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi kerberosProperties = new KerberosProperties(kerberosConfigFile); props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); + props.add(kerberosProperties.getKerberosPassword()); properties = props; } @@ -306,38 +313,38 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi if (confFileProvided) { final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = explicitPrincipal; - resolvedKeytab = explicitKeytab; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; } final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger())); - if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) { problems.add(new ValidationResult.Builder() - .subject("Kerberos Credentials") - .valid(false) - .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") - .build()); + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password") + .build()); } - final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); - if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + if (!isAllowExplicitKeytab() && explicitKeytab != null) { problems.add(new ValidationResult.Builder() - .subject("Kerberos Credentials") - .valid(false) - .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " - + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") - .build()); + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); } } @@ -398,28 +405,37 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi if (SecurityUtil.isSecurityEnabled(hiveConfig)) { final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue(); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = explicitPrincipal; - resolvedKeytab = explicitKeytab; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + if (resolvedKeytab != null) { + kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } try { - ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); } catch (AuthenticationFailedException ae) { log.error(ae.getMessage(), ae); throw new InitializationException(ae); } - getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + getLogger().info("Successfully logged in as principal " + resolvedPrincipal); } final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); @@ -476,13 +492,24 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi public Connection getConnection() throws ProcessException { try { if (ugi != null) { - synchronized(this) { - /* - * Make sure that only one thread can request that the UGI relogin at a time. This - * explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling - * the acquisition of a new TGT after the current one has expired. - * https://issues.apache.org/jira/browse/NIFI-5134 - */ + /* + * Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization + * is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized. + */ + getLogger().trace("getting UGI instance"); + if (kerberosUserReference.get() != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + KerberosUser kerberosUser = kerberosUserReference.get(); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser " + kerberosUser); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + // no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally ugi.checkTGTAndReloginFromKeytab(); } try { @@ -515,4 +542,10 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi return connectionUrl; } + /* + * Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables. + */ + boolean isAllowExplicitKeytab() { + return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB)); + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 535754fb12..a1123d21d1 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.SecurityUtil; @@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.util.StringUtils; @@ -201,6 +205,25 @@ public class PutHive3Streaming extends AbstractProcessor { .required(false) .build(); + static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() + .name("kerberos-principal") + .displayName("Kerberos Principal") + .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() + .name("kerberos-password") + .displayName("Kerberos Password") + .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -225,6 +248,7 @@ public class PutHive3Streaming extends AbstractProcessor { protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); protected volatile UserGroupInformation ugi; + final protected AtomicReference kerberosUserReference = new AtomicReference<>(); protected volatile HiveConf hiveConfig; protected volatile int callTimeout; @@ -247,6 +271,8 @@ public class PutHive3Streaming extends AbstractProcessor { props.add(DISABLE_STREAMING_OPTIMIZATIONS); props.add(ROLLBACK_ON_FAILURE); props.add(KERBEROS_CREDENTIALS_SERVICE); + props.add(KERBEROS_PRINCIPAL); + props.add(KERBEROS_PASSWORD); propertyDescriptors = Collections.unmodifiableList(props); @@ -274,14 +300,23 @@ public class PutHive3Streaming extends AbstractProcessor { final List problems = new ArrayList<>(); final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String explicitPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue(); - final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null; + final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal; final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null; if (confFileProvided) { final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger())); } + if (credentialsService != null && (explicitPrincipal != null || explicitPassword != null)) { + problems.add(new ValidationResult.Builder() + .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName()) + .valid(false) + .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time") + .build()); + } return problems; } @@ -310,20 +345,33 @@ public class PutHive3Streaming extends AbstractProcessor { if (SecurityUtil.isSecurityEnabled(hiveConfig)) { final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); - final String resolvedPrincipal = credentialsService.getPrincipal(); - final String resolvedKeytab = credentialsService.getKeytab(); + final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal; + final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null; + + if (resolvedKeytab != null) { + kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab}); try { - ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); } catch (AuthenticationFailedException ae) { - throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); + log.error(ae.getMessage(), ae); + throw new ProcessException(ae); } log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab}); } else { ugi = null; + kerberosUserReference.set(null); } callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds @@ -532,6 +580,7 @@ public class PutHive3Streaming extends AbstractProcessor { } ugi = null; + kerberosUserReference.set(null); } private void abortAndCloseConnection(StreamingConnection connection) { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index 092557baee..38329449de 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.security.krb.KerberosUser; import java.io.IOException; import java.util.ArrayList; @@ -38,7 +39,8 @@ import java.util.concurrent.atomic.AtomicReference; */ public class HiveConfigurator { - public Collection validate(String configFiles, String principal, String keyTab, AtomicReference validationResourceHolder, ComponentLog log) { + public Collection validate(String configFiles, String principal, String keyTab, String password, + AtomicReference validationResourceHolder, ComponentLog log) { final List problems = new ArrayList<>(); ValidationResources resources = validationResourceHolder.get(); @@ -53,7 +55,7 @@ public class HiveConfigurator { final Configuration hiveConfig = resources.getConfiguration(); - problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log)); + problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log)); return problems; } @@ -77,6 +79,22 @@ public class HiveConfigurator { } } + /** + * Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}. + * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser) + * @param hiveConfig The Configuration to apply to the acquired UserGroupInformation + * @param kerberosUser The KerberosUser to authenticate + * @return A UserGroupInformation instance created using the Subject of the given KerberosUser + * @throws AuthenticationFailedException if authentication fails + */ + public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException { + try { + return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser); + } catch (IOException ioe) { + throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe); + } + } + /** * As of Apache NiFi 1.5.0, due to changes made to * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this @@ -94,7 +112,9 @@ public class HiveConfigurator { * authentication attempts that would leave the Hive controller service in an unrecoverable state. * * @see SecurityUtil#loginKerberos(Configuration, String, String) + * @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)} */ + @Deprecated public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException { UserGroupInformation ugi; try { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index 16c42e5943..d9113fe786 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -61,6 +61,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MapRecord; @@ -115,9 +116,12 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -262,7 +266,7 @@ public class TestPutHive3Streaming { } @Test - public void testUgiGetsCleared() throws Exception { + public void testUgiAndKerberosUserGetsCleared() throws Exception { configure(processor, 0); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.DB_NAME, "default"); @@ -270,6 +274,7 @@ public class TestPutHive3Streaming { processor.ugi = mock(UserGroupInformation.class); runner.run(); assertNull(processor.ugi); + assertNull(processor.kerberosUserReference.get()); } @Test @@ -281,12 +286,13 @@ public class TestPutHive3Streaming { runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs"); runner.enableControllerService(kcs); ugi = mock(UserGroupInformation.class); - when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString())).thenReturn(ugi); + when(hiveConfigurator.authenticate(eq(hiveConf), any(KerberosUser.class))).thenReturn(ugi); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); runner.enqueue(new byte[0]); runner.run(); + verify(hiveConfigurator, times(1)).authenticate(eq(hiveConf), any(KerberosUser.class)); } @Test(expected = AssertionError.class) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java index dd2e1fe606..1e460bccfb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java @@ -31,12 +31,16 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.util.hive.AuthenticationFailedException; import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveUtils; @@ -57,6 +61,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.expression.ExpressionLanguageScope; +import javax.security.auth.login.LoginException; + /** * Implementation for Database Connection Pooling Service used for Apache Hive 1.1 * connections. Apache DBCP is used for connection pooling functionality. @@ -149,6 +155,25 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements .required(false) .build(); + static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() + .name("kerberos-principal") + .displayName("Kerberos Principal") + .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() + .name("kerberos-password") + .displayName("Kerberos Password") + .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + private List properties; @@ -161,6 +186,7 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); private volatile UserGroupInformation ugi; + private final AtomicReference kerberosUserReference = new AtomicReference<>(); @Override protected void init(final ControllerServiceInitializationContext context) { @@ -173,6 +199,8 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements props.add(MAX_TOTAL_CONNECTIONS); props.add(VALIDATION_QUERY); props.add(KERBEROS_CREDENTIALS_SERVICE); + props.add(KERBEROS_PRINCIPAL); + props.add(KERBEROS_PASSWORD); properties = props; } @@ -190,19 +218,29 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements if (confFileProvided) { final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final String explicitPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue(); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = null; - resolvedKeytab = null; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = null; } final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger())); + + if (credentialsService != null && (explicitPrincipal != null || explicitPassword != null)) { + problems.add(new ValidationResult.Builder() + .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName()) + .valid(false) + .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time") + .build()); + } } return problems; @@ -260,28 +298,38 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements final String drv = HiveDriver.class.getName(); if (SecurityUtil.isSecurityEnabled(hiveConfig)) { + final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String resolvedPrincipal; final String resolvedKeytab; - if (credentialsService == null) { - resolvedPrincipal = null; - resolvedKeytab = null; - } else { + if (credentialsService != null) { resolvedPrincipal = credentialsService.getPrincipal(); resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = null; } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + if (resolvedKeytab != null) { + kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab)); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword)); + log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } try { - ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); } catch (AuthenticationFailedException ae) { log.error(ae.getMessage(), ae); throw new InitializationException(ae); } - getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + getLogger().info("Successfully logged in as principal " + resolvedPrincipal); } final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); @@ -325,13 +373,24 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements public Connection getConnection() throws ProcessException { try { if (ugi != null) { - synchronized(this) { - /* - * Make sure that only one thread can request that the UGI relogin at a time. This - * explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling - * the acquisition of a new TGT after the current one has expired. - * https://issues.apache.org/jira/browse/NIFI-5134 - */ + /* + * Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization + * is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized. + */ + getLogger().trace("getting UGI instance"); + if (kerberosUserReference.get() != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + KerberosUser kerberosUser = kerberosUserReference.get(); + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser " + kerberosUser); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + // no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally ugi.checkTGTAndReloginFromKeytab(); } try { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index 5fed81adaa..b212207ebb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.security.krb.KerberosUser; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference; public class HiveConfigurator { - public Collection validate(String configFiles, String principal, String keyTab, AtomicReference validationResourceHolder, ComponentLog log) { + public Collection validate(String configFiles, String principal, String keyTab, String password, + AtomicReference validationResourceHolder, ComponentLog log) { final List problems = new ArrayList<>(); ValidationResources resources = validationResourceHolder.get(); @@ -50,7 +52,7 @@ public class HiveConfigurator { final Configuration hiveConfig = resources.getConfiguration(); - problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log)); + problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log)); return problems; } @@ -74,6 +76,22 @@ public class HiveConfigurator { } } + /** + * Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}. + * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser) + * @param hiveConfig The Configuration to apply to the acquired UserGroupInformation + * @param kerberosUser The KerberosUser to authenticate + * @return A UserGroupInformation instance created using the Subject of the given KerberosUser + * @throws AuthenticationFailedException if authentication fails + */ + public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException { + try { + return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser); + } catch (IOException ioe) { + throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe); + } + } + /** * As of Apache NiFi 1.5.0, due to changes made to * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this @@ -91,7 +109,9 @@ public class HiveConfigurator { * authentication attempts that would leave the Hive controller service in an unrecoverable state. * * @see SecurityUtil#loginKerberos(Configuration, String, String) + * @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)} */ + @Deprecated public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException { UserGroupInformation ugi; try {