mirror of https://github.com/apache/nifi.git
NIFI-7025: Initial commit adding Kerberos Password feature for Hive components
Kerberos Password property should not support EL, this includes a change to KerberosProperties which is also used by the HDFS processors (AbstractHadoopProcessor) Added wiring in a KerberosContext to a TestRunner's MockProcessorInitializationContext Removed synchronization blocks around KerberosUser.checkTGTAndRelogin, since that method is already synchronized Updated AbstractHadoopProcessor to have a boolean accessor method to determine if explicit keytab configuration is allowed Removed synchronization block from HiveConnectionPool's getConnection method (in Hive, Hive_1_1, Hive3 modules), since new TGT ticket acquisition is handled by the KerberosUser implementation. If UGI is used to relogin, synchronization is handled internally by UGI. Added Kerberos Principal and Kerberos Password properties to Hive, Hive_1_1, and Hive3 components Hive, Hive_1_1, and Hive3 components now use KerberosUser implementations to authenticate with a KDC Updated handling of the NIFI_ALLOW_EXPLICIT_KEYTAB environment variable in Hive and Hive3 components. An accessor method has been added that uses Boolean.parseBoolean, which returns true if the environment variable is set to true, and false otherwise (including when the environment variable is unset). Addressing PR feedback Addressing PR feedback This closes #4102.
This commit is contained in:
parent
614136ce51
commit
1678531638
|
@ -82,7 +82,9 @@ public abstract class AbstractKerberosUser implements KerberosUser {
|
|||
loggedIn.set(true);
|
||||
LOGGER.debug("Successful login for {}", new Object[]{principal});
|
||||
} catch (LoginException le) {
|
||||
throw new LoginException("Unable to login with " + principal + " due to: " + le.getMessage());
|
||||
LoginException loginException = new LoginException("Unable to login with " + principal + " due to: " + le.getMessage());
|
||||
loginException.setStackTrace(le.getStackTrace());
|
||||
throw loginException;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.nifi.security.krb;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.Configuration;
|
||||
|
@ -27,6 +29,7 @@ import java.util.Map;
|
|||
* Custom JAAS Configuration object for a provided principal and keytab.
|
||||
*/
|
||||
public class KeytabConfiguration extends Configuration {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KeytabConfiguration.class);
|
||||
|
||||
private final String principal;
|
||||
private final String keytabFile;
|
||||
|
@ -63,6 +66,7 @@ public class KeytabConfiguration extends Configuration {
|
|||
final String krbLoginModuleName = ConfigurationUtil.IS_IBM
|
||||
? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
|
||||
|
||||
LOGGER.debug("krbLoginModuleName: {}, configuration options: {}", krbLoginModuleName, options);
|
||||
this.kerberosKeytabConfigEntry = new AppConfigurationEntry(
|
||||
krbLoginModuleName, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.UUID;
|
|||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||
import org.apache.nifi.controller.NodeTypeProvider;
|
||||
import org.apache.nifi.kerberos.KerberosContext;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
|
||||
|
@ -31,15 +32,21 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
|
|||
private final MockComponentLog logger;
|
||||
private final String processorId;
|
||||
private final MockProcessContext context;
|
||||
private final KerberosContext kerberosContext;
|
||||
|
||||
public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context) {
|
||||
this(processor, context, null);
|
||||
this(processor, context, null, null);
|
||||
}
|
||||
|
||||
public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger) {
|
||||
this(processor, context, logger, null);
|
||||
}
|
||||
|
||||
public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger, KerberosContext kerberosContext) {
|
||||
processorId = UUID.randomUUID().toString();
|
||||
this.logger = logger == null ? new MockComponentLog(processorId, processor) : logger;
|
||||
this.context = context;
|
||||
this.kerberosContext = kerberosContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,16 +101,16 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
|
|||
|
||||
@Override
|
||||
public String getKerberosServicePrincipal() {
|
||||
return null; //this needs to be wired in.
|
||||
return kerberosContext != null ? kerberosContext.getKerberosServicePrincipal() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getKerberosServiceKeytab() {
|
||||
return null; //this needs to be wired in.
|
||||
return kerberosContext != null ? kerberosContext.getKerberosServiceKeytab() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getKerberosConfigurationFile() {
|
||||
return null; //this needs to be wired in.
|
||||
return kerberosContext != null ? kerberosContext.getKerberosConfigurationFile() : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.nifi.controller.ControllerService;
|
|||
import org.apache.nifi.controller.queue.QueueSize;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.kerberos.KerberosContext;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
|
@ -77,6 +78,7 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
|
||||
private final Processor processor;
|
||||
private final MockProcessContext context;
|
||||
private final KerberosContext kerberosContext;
|
||||
private final MockFlowFileQueue flowFileQueue;
|
||||
private final SharedSessionState sharedState;
|
||||
private final AtomicLong idGenerator;
|
||||
|
@ -99,10 +101,18 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
}
|
||||
|
||||
StandardProcessorTestRunner(final Processor processor, String processorName) {
|
||||
this(processor, processorName, null);
|
||||
this(processor, processorName, null, null);
|
||||
}
|
||||
|
||||
StandardProcessorTestRunner(final Processor processor, String processorName, KerberosContext kerberosContext) {
|
||||
this(processor, processorName, null, kerberosContext);
|
||||
}
|
||||
|
||||
StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger) {
|
||||
this(processor, processorName, logger, null);
|
||||
}
|
||||
|
||||
StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger, KerberosContext kerberosContext) {
|
||||
this.processor = processor;
|
||||
this.idGenerator = new AtomicLong(0L);
|
||||
this.sharedState = new SharedSessionState(processor, idGenerator);
|
||||
|
@ -111,8 +121,9 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
this.processorStateManager = new MockStateManager(processor);
|
||||
this.variableRegistry = new MockVariableRegistry();
|
||||
this.context = new MockProcessContext(processor, processorName, processorStateManager, variableRegistry);
|
||||
this.kerberosContext = kerberosContext;
|
||||
|
||||
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger);
|
||||
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger, kerberosContext);
|
||||
processor.initialize(mockInitContext);
|
||||
this.logger = mockInitContext.getLogger();
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.util;
|
||||
|
||||
import org.apache.nifi.kerberos.KerberosContext;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
|
||||
public class TestRunners {
|
||||
|
@ -30,6 +31,16 @@ public class TestRunners {
|
|||
return newTestRunner(processor,processor.getClass().getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code TestRunner} for the given {@code Processor} which uses the given {@code KerberosContext}.
|
||||
* @param processor the {@code Processor} under test
|
||||
* @param kerberosContext the {@code KerberosContext} used during the test
|
||||
* @return
|
||||
*/
|
||||
public static TestRunner newTestRunner(final Processor processor, KerberosContext kerberosContext) {
|
||||
return newTestRunner(processor,processor.getClass().getName(), kerberosContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code TestRunner} for the given {@code Processor}.
|
||||
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
|
||||
|
@ -52,6 +63,17 @@ public class TestRunners {
|
|||
return new StandardProcessorTestRunner(processor, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code TestRunner} for the given {@code Processor} and {@code KerberosContext}.
|
||||
* @param processor the {@code Processor} under test
|
||||
* @param name the name to give the {@code Processor}
|
||||
* @param kerberosContext the {@code KerberosContext} used during the test
|
||||
* @return a {@code TestRunner}
|
||||
*/
|
||||
public static TestRunner newTestRunner(final Processor processor, String name, KerberosContext kerberosContext) {
|
||||
return new StandardProcessorTestRunner(processor, name, kerberosContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code TestRunner} for the given {@code Processor}.
|
||||
* The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name.
|
||||
|
|
|
@ -104,8 +104,7 @@ public class KerberosProperties {
|
|||
.name("Kerberos Password")
|
||||
.required(false)
|
||||
.description("Kerberos password associated with the principal.")
|
||||
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -78,6 +78,15 @@ public class SecurityUtil {
|
|||
return UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticates a {@link KerberosUser} and acquires a {@link UserGroupInformation} instance using {@link UserGroupInformation#getUGIFromSubject(Subject)}.
|
||||
* The {@link UserGroupInformation} will use the given {@link Configuration}.
|
||||
*
|
||||
* @param config The Configuration to apply to the acquired UserGroupInformation instance
|
||||
* @param kerberosUser The KerberosUser to authenticate
|
||||
* @return A UserGroupInformation instance created using the Subject of the given KerberosUser
|
||||
* @throws IOException if authentication fails
|
||||
*/
|
||||
public static synchronized UserGroupInformation getUgiForKerberosUser(final Configuration config, final KerberosUser kerberosUser) throws IOException {
|
||||
UserGroupInformation.setConfiguration(config);
|
||||
try {
|
||||
|
|
|
@ -184,12 +184,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
final String resolvedPassword;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
|
@ -197,7 +196,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
}
|
||||
resolvedPassword = explicitPassword;
|
||||
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
|
@ -220,7 +218,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
|
||||
final Configuration conf = resources.getConfiguration();
|
||||
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, resolvedPassword, getLogger()));
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
|
||||
|
||||
} catch (final IOException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
|
@ -238,8 +236,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.build());
|
||||
}
|
||||
|
||||
final String allowExplicitKeytabVariable = getAllowExplicitKeytabEnvironmentVariable();
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && explicitKeytab != null) {
|
||||
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
|
@ -390,7 +387,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
if (SecurityUtil.isSecurityEnabled(config)) {
|
||||
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
String password = context.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
|
||||
String password = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
|
||||
// If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
|
||||
// The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
|
||||
|
@ -545,15 +542,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
if (hdfsResources.get().getKerberosUser() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = hdfsResources.get().getKerberosUser();
|
||||
synchronized (kerberosUser) {
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser [{}]" + kerberosUser);
|
||||
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
}
|
||||
|
@ -563,8 +558,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
/*
|
||||
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
|
||||
*/
|
||||
String getAllowExplicitKeytabEnvironmentVariable() {
|
||||
return System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
boolean isAllowExplicitKeytab() {
|
||||
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
|
||||
}
|
||||
|
||||
static protected class HdfsResources {
|
||||
|
|
|
@ -75,21 +75,21 @@ class AbstractHadoopProcessorSpec extends Specification {
|
|||
|
||||
where:
|
||||
testName | configuredPrincipal | configuredKeytab | configuredPassword | allowExplicitKeytab | configuredKeytabCredentialsService | configuredKeytabCredentialsServicePrincipal | configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount
|
||||
"success case 1" | "principal" | "keytab" | null | "true" | false | null | null || 0
|
||||
"success case 2" | "principal" | null | "password" | "true" | false | null | null || 0
|
||||
"success case 3" | "principal" | null | "password" | "false" | false | null | null || 0
|
||||
"success case 4" | null | null | null | "true" | true | "principal" | "keytab" || 0
|
||||
"success case 5" | null | null | null | "false" | true | "principal" | "keytab" || 0
|
||||
"success case 1" | "principal" | "keytab" | null | true | false | null | null || 0
|
||||
"success case 2" | "principal" | null | "password" | true | false | null | null || 0
|
||||
"success case 3" | "principal" | null | "password" | false | false | null | null || 0
|
||||
"success case 4" | null | null | null | true | true | "principal" | "keytab" || 0
|
||||
"success case 5" | null | null | null | false | true | "principal" | "keytab" || 0
|
||||
// do not allow explicit keytab, but provide one anyway; validation fails
|
||||
"failure case 1" | "principal" | "keytab" | null | "false" | false | null | null || 1
|
||||
"failure case 2" | null | "keytab" | null | "false" | false | null | null || 2
|
||||
"failure case 1" | "principal" | "keytab" | null | false | false | null | null || 1
|
||||
"failure case 2" | null | "keytab" | null | false | false | null | null || 2
|
||||
// keytab credentials service is provided, but explicit properties for principal, password, or keytab are also provided; validation fails
|
||||
"failure case 3" | "principal" | null | null | "true" | true | "principal" | "keytab" || 1
|
||||
"failure case 4" | null | "keytab" | null | "true" | true | "principal" | "keytab" || 1
|
||||
"failure case 5" | null | null | "password" | "true" | true | "principal" | "keytab" || 2
|
||||
"failure case 6" | "principal" | null | null | "false" | true | "principal" | "keytab" || 1
|
||||
"failure case 7" | null | "keytab" | null | "false" | true | "principal" | "keytab" || 2
|
||||
"failure case 8" | null | null | "password" | "false" | true | "principal" | "keytab" || 2
|
||||
"failure case 3" | "principal" | null | null | true | true | "principal" | "keytab" || 1
|
||||
"failure case 4" | null | "keytab" | null | true | true | "principal" | "keytab" || 1
|
||||
"failure case 5" | null | null | "password" | true | true | "principal" | "keytab" || 2
|
||||
"failure case 6" | "principal" | null | null | false | true | "principal" | "keytab" || 1
|
||||
"failure case 7" | null | "keytab" | null | false | true | "principal" | "keytab" || 2
|
||||
"failure case 8" | null | null | "password" | false | true | "principal" | "keytab" || 2
|
||||
}
|
||||
|
||||
private class TestAbstractHadoopProcessor extends AbstractHadoopProcessor {
|
||||
|
@ -105,7 +105,7 @@ class AbstractHadoopProcessorSpec extends Specification {
|
|||
}
|
||||
|
||||
@Override
|
||||
String getAllowExplicitKeytabEnvironmentVariable() {
|
||||
boolean isAllowExplicitKeytab() {
|
||||
allowExplicitKeytab
|
||||
}
|
||||
|
||||
|
|
|
@ -26,9 +26,15 @@ import java.io.File;
|
|||
public class SimpleHadoopProcessor extends AbstractHadoopProcessor {
|
||||
|
||||
private KerberosProperties testKerberosProperties;
|
||||
private boolean allowExplicitKeytab;
|
||||
|
||||
public SimpleHadoopProcessor(KerberosProperties kerberosProperties) {
|
||||
this(kerberosProperties, true);
|
||||
}
|
||||
|
||||
public SimpleHadoopProcessor(KerberosProperties kerberosProperties, boolean allowExplicitKeytab) {
|
||||
this.testKerberosProperties = kerberosProperties;
|
||||
this.allowExplicitKeytab = allowExplicitKeytab;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -40,4 +46,8 @@ public class SimpleHadoopProcessor extends AbstractHadoopProcessor {
|
|||
return testKerberosProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isAllowExplicitKeytab() {
|
||||
return allowExplicitKeytab;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,9 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.hive.AuthenticationFailedException;
|
||||
import org.apache.nifi.util.hive.HiveConfigurator;
|
||||
import org.apache.nifi.util.hive.HiveUtils;
|
||||
|
@ -59,6 +62,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
|
||||
/**
|
||||
* Implementation for Database Connection Pooling Service used for Apache Hive
|
||||
* connections. Apache DBCP is used for connection pooling functionality.
|
||||
|
@ -165,6 +170,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
|
||||
private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
private volatile UserGroupInformation ugi;
|
||||
private final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile KerberosProperties kerberosProperties;
|
||||
|
||||
|
@ -184,6 +190,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
kerberosProperties = new KerberosProperties(kerberosConfigFile);
|
||||
props.add(kerberosProperties.getKerberosPrincipal());
|
||||
props.add(kerberosProperties.getKerberosKeytab());
|
||||
props.add(kerberosProperties.getKerberosPassword());
|
||||
properties = props;
|
||||
}
|
||||
|
||||
|
@ -201,36 +208,36 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
if (confFileProvided) {
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
}
|
||||
|
||||
|
||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
|
||||
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
|
||||
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
|
||||
.build());
|
||||
}
|
||||
|
@ -293,28 +300,37 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
|
||||
final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
}
|
||||
|
||||
if (resolvedKeytab != null) {
|
||||
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
} else if (explicitPassword != null) {
|
||||
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
|
||||
} else {
|
||||
throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
|
||||
try {
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
|
||||
} catch (AuthenticationFailedException ae) {
|
||||
log.error(ae.getMessage(), ae);
|
||||
throw new InitializationException(ae);
|
||||
}
|
||||
|
||||
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
getLogger().info("Successfully logged in as principal " + resolvedPrincipal);
|
||||
}
|
||||
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
|
@ -356,13 +372,24 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
||||
if (ugi != null) {
|
||||
synchronized(this) {
|
||||
/*
|
||||
* Make sure that only one thread can request that the UGI relogin at a time. This
|
||||
* explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling
|
||||
* the acquisition of a new TGT after the current one has expired.
|
||||
* https://issues.apache.org/jira/browse/NIFI-5134
|
||||
* Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization
|
||||
* is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
|
||||
*/
|
||||
getLogger().trace("getting UGI instance");
|
||||
if (kerberosUserReference.get() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = kerberosUserReference.get();
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
// no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
try {
|
||||
|
@ -395,4 +422,10 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
return connectionUrl;
|
||||
}
|
||||
|
||||
/*
|
||||
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
|
||||
*/
|
||||
boolean isAllowExplicitKeytab() {
|
||||
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,9 @@ import org.apache.nifi.processor.util.pattern.ErrorTypes;
|
|||
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
|
||||
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
|
||||
import org.apache.nifi.processor.util.pattern.RoutingResult;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.hive.AuthenticationFailedException;
|
||||
import org.apache.nifi.util.hive.HiveConfigurator;
|
||||
import org.apache.nifi.util.hive.HiveOptions;
|
||||
|
@ -70,6 +73,7 @@ import org.apache.nifi.util.hive.HiveWriter;
|
|||
import org.xerial.snappy.Snappy;
|
||||
import org.apache.nifi.util.hive.ValidationResources;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -318,6 +322,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
|
||||
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
protected volatile UserGroupInformation ugi;
|
||||
final protected AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
|
||||
protected volatile HiveConf hiveConfig;
|
||||
|
||||
protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
|
||||
|
@ -353,6 +358,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
kerberosProperties = new KerberosProperties(kerberosConfigFile);
|
||||
props.add(kerberosProperties.getKerberosPrincipal());
|
||||
props.add(kerberosProperties.getKerberosKeytab());
|
||||
props.add(kerberosProperties.getKerberosPassword());
|
||||
propertyDescriptors = Collections.unmodifiableList(props);
|
||||
|
||||
Set<Relationship> _relationships = new HashSet<>();
|
||||
|
@ -382,6 +388,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
if (confFileProvided) {
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
|
@ -396,22 +403,21 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
|
||||
|
||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
|
||||
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
|
||||
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
|
||||
.build());
|
||||
}
|
||||
|
@ -446,6 +452,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
|
||||
final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
|
@ -458,16 +465,26 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
resolvedKeytab = credentialsService.getKeytab();
|
||||
}
|
||||
|
||||
if (resolvedKeytab != null) {
|
||||
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
} else if (explicitPassword != null) {
|
||||
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
|
||||
} else {
|
||||
throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
|
||||
try {
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
|
||||
} catch (AuthenticationFailedException ae) {
|
||||
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
|
||||
}
|
||||
|
||||
log.info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
log.info("Successfully logged in as principal " + resolvedPrincipal);
|
||||
} else {
|
||||
ugi = null;
|
||||
kerberosUserReference.set(null);
|
||||
}
|
||||
|
||||
callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
|
||||
|
@ -967,6 +984,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
|
||||
ugi = null;
|
||||
kerberosUserReference.set(null);
|
||||
}
|
||||
|
||||
private void setupHeartBeatTimer(int heartbeatInterval) {
|
||||
|
@ -1048,7 +1066,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
HiveWriter writer = writers.get(endPoint);
|
||||
if (writer == null) {
|
||||
log.debug("Creating Writer to Hive end point : " + endPoint);
|
||||
writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
|
||||
writer = makeHiveWriter(endPoint, callTimeoutPool, getUgi(), options);
|
||||
if (writers.size() > (options.getMaxOpenConnections() - 1)) {
|
||||
log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{writers.size(), options.getMaxOpenConnections()});
|
||||
int retired = retireIdleWriters(writers, options.getIdleTimeout());
|
||||
|
@ -1144,6 +1162,31 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
|||
return kerberosProperties;
|
||||
}
|
||||
|
||||
UserGroupInformation getUgi() {
|
||||
getLogger().trace("getting UGI instance");
|
||||
if (kerberosUserReference.get() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = kerberosUserReference.get();
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
}
|
||||
return ugi;
|
||||
}
|
||||
|
||||
/*
|
||||
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
|
||||
*/
|
||||
boolean isAllowExplicitKeytab() {
|
||||
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
|
||||
}
|
||||
|
||||
protected class HiveStreamingRecord {
|
||||
|
||||
private List<String> partitionValues;
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
public class HiveConfigurator {
|
||||
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, String password,
|
||||
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
|
||||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
@ -50,7 +52,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
@ -74,6 +76,22 @@ public class HiveConfigurator {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}.
|
||||
* @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
|
||||
* @param hiveConfig The Configuration to apply to the acquired UserGroupInformation
|
||||
* @param kerberosUser The KerberosUser to authenticate
|
||||
* @return A UserGroupInformation instance created using the Subject of the given KerberosUser
|
||||
* @throws AuthenticationFailedException if authentication fails
|
||||
*/
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException {
|
||||
try {
|
||||
return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser);
|
||||
} catch (IOException ioe) {
|
||||
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As of Apache NiFi 1.5.0, due to changes made to
|
||||
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||
|
@ -91,7 +109,9 @@ public class HiveConfigurator {
|
|||
* authentication attempts that would leave the Hive controller service in an unrecoverable state.
|
||||
*
|
||||
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||
* @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
|
||||
*/
|
||||
@Deprecated
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException {
|
||||
UserGroupInformation ugi;
|
||||
try {
|
||||
|
|
|
@ -34,7 +34,10 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
|
|||
import org.apache.hive.hcatalog.streaming.TransactionBatch;
|
||||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.kerberos.KerberosContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -67,6 +70,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
@ -103,7 +107,11 @@ public class TestPutHiveStreaming {
|
|||
when(hiveConfigurator.getConfigurationFromFiles(AdditionalMatchers.or(anyString(), isNull()))).thenReturn(hiveConf);
|
||||
processor.hiveConfigurator = hiveConfigurator;
|
||||
processor.setKerberosProperties(kerberosPropsWithFile);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
KerberosContext mockKerberosContext = mock(KerberosContext.class);
|
||||
when(mockKerberosContext.getKerberosConfigurationFile()).thenReturn(kerberosPropsWithFile.getKerberosConfigFile());
|
||||
when(mockKerberosContext.getKerberosServiceKeytab()).thenReturn(null);
|
||||
when(mockKerberosContext.getKerberosServicePrincipal()).thenReturn(null);
|
||||
runner = TestRunners.newTestRunner(processor, mockKerberosContext);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -118,23 +126,27 @@ public class TestPutHiveStreaming {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUgiGetsCleared() {
|
||||
public void testUgiAndKerberosUserGetsCleared() {
|
||||
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
|
||||
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
|
||||
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
|
||||
processor.ugi = mock(UserGroupInformation.class);
|
||||
processor.kerberosUserReference.set(new KerberosPasswordUser("user", "password"));
|
||||
runner.run();
|
||||
assertNull(processor.ugi);
|
||||
assertNull(processor.kerberosUserReference.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException {
|
||||
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
|
||||
ugi = mock(UserGroupInformation.class);
|
||||
when(hiveConfigurator.authenticate(eq(hiveConf), AdditionalMatchers.or(anyString(), isNull()), AdditionalMatchers.or(anyString(), isNull()))).thenReturn(ugi);
|
||||
when(hiveConfigurator.authenticate(eq(hiveConf), any(KerberosUser.class))).thenReturn(ugi);
|
||||
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
|
||||
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
|
||||
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
|
||||
runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
|
||||
runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(), "principal");
|
||||
Map<String, Object> user1 = new HashMap<String, Object>() {
|
||||
{
|
||||
put("name", "Joe");
|
||||
|
@ -163,7 +175,7 @@ public class TestPutHiveStreaming {
|
|||
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
|
||||
runner.setProperty(PutHiveStreaming.HIVE_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml, src/test/resources/hive-site-security.xml");
|
||||
runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
|
||||
runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
|
||||
runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab");
|
||||
runner.run();
|
||||
}
|
||||
|
||||
|
@ -1037,6 +1049,10 @@ public class TestPutHiveStreaming {
|
|||
this.generateExceptionOnFlushAndClose = generateExceptionOnFlushAndClose;
|
||||
}
|
||||
|
||||
@Override
|
||||
UserGroupInformation getUgi() {
|
||||
return ugi;
|
||||
}
|
||||
}
|
||||
|
||||
private class MockHiveWriter extends HiveWriter {
|
||||
|
|
|
@ -43,6 +43,9 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.hive.AuthenticationFailedException;
|
||||
import org.apache.nifi.util.hive.HiveConfigurator;
|
||||
import org.apache.nifi.util.hive.HiveUtils;
|
||||
|
@ -62,6 +65,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
|
||||
/**
|
||||
* Implementation for Database Connection Pooling Service used for Apache Hive
|
||||
* connections. Apache DBCP is used for connection pooling functionality.
|
||||
|
@ -264,6 +269,7 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
|
||||
private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
private volatile UserGroupInformation ugi;
|
||||
private final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile KerberosProperties kerberosProperties;
|
||||
|
||||
|
@ -289,6 +295,7 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
kerberosProperties = new KerberosProperties(kerberosConfigFile);
|
||||
props.add(kerberosProperties.getKerberosPrincipal());
|
||||
props.add(kerberosProperties.getKerberosKeytab());
|
||||
props.add(kerberosProperties.getKerberosPassword());
|
||||
properties = props;
|
||||
}
|
||||
|
||||
|
@ -306,36 +313,36 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
if (confFileProvided) {
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
}
|
||||
|
||||
|
||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
|
||||
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
|
||||
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
|
||||
.build());
|
||||
}
|
||||
|
@ -398,28 +405,37 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
|
||||
final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
}
|
||||
|
||||
if (resolvedKeytab != null) {
|
||||
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
} else if (explicitPassword != null) {
|
||||
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
|
||||
} else {
|
||||
throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
|
||||
try {
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
|
||||
} catch (AuthenticationFailedException ae) {
|
||||
log.error(ae.getMessage(), ae);
|
||||
throw new InitializationException(ae);
|
||||
}
|
||||
|
||||
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
getLogger().info("Successfully logged in as principal " + resolvedPrincipal);
|
||||
}
|
||||
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
|
@ -476,13 +492,24 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
||||
if (ugi != null) {
|
||||
synchronized(this) {
|
||||
/*
|
||||
* Make sure that only one thread can request that the UGI relogin at a time. This
|
||||
* explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling
|
||||
* the acquisition of a new TGT after the current one has expired.
|
||||
* https://issues.apache.org/jira/browse/NIFI-5134
|
||||
* Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization
|
||||
* is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
|
||||
*/
|
||||
getLogger().trace("getting UGI instance");
|
||||
if (kerberosUserReference.get() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = kerberosUserReference.get();
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
// no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
try {
|
||||
|
@ -515,4 +542,10 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
|
|||
return connectionUrl;
|
||||
}
|
||||
|
||||
/*
|
||||
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
|
||||
*/
|
||||
boolean isAllowExplicitKeytab() {
|
||||
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
|
@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
|
||||
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
|
||||
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
@ -201,6 +205,25 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
// Relationships
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
|
@ -225,6 +248,7 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
|
||||
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
protected volatile UserGroupInformation ugi;
|
||||
final protected AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
|
||||
protected volatile HiveConf hiveConfig;
|
||||
|
||||
protected volatile int callTimeout;
|
||||
|
@ -247,6 +271,8 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
|
||||
props.add(ROLLBACK_ON_FAILURE);
|
||||
props.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
props.add(KERBEROS_PRINCIPAL);
|
||||
props.add(KERBEROS_PASSWORD);
|
||||
|
||||
propertyDescriptors = Collections.unmodifiableList(props);
|
||||
|
||||
|
@ -274,14 +300,23 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String explicitPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : null;
|
||||
final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal;
|
||||
final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
|
||||
if (confFileProvided) {
|
||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
|
||||
}
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitPassword != null)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
return problems;
|
||||
}
|
||||
|
||||
|
@ -310,20 +345,33 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
|
||||
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
final String resolvedPrincipal = credentialsService.getPrincipal();
|
||||
final String resolvedKeytab = credentialsService.getKeytab();
|
||||
final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal;
|
||||
final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
|
||||
|
||||
if (resolvedKeytab != null) {
|
||||
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
} else if (explicitPassword != null) {
|
||||
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
|
||||
} else {
|
||||
throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
|
||||
try {
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
|
||||
} catch (AuthenticationFailedException ae) {
|
||||
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
|
||||
log.error(ae.getMessage(), ae);
|
||||
throw new ProcessException(ae);
|
||||
}
|
||||
|
||||
log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
|
||||
} else {
|
||||
ugi = null;
|
||||
kerberosUserReference.set(null);
|
||||
}
|
||||
|
||||
callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
|
||||
|
@ -532,6 +580,7 @@ public class PutHive3Streaming extends AbstractProcessor {
|
|||
}
|
||||
|
||||
ugi = null;
|
||||
kerberosUserReference.set(null);
|
||||
}
|
||||
|
||||
private void abortAndCloseConnection(StreamingConnection connection) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -38,7 +39,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
*/
|
||||
public class HiveConfigurator {
|
||||
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, String password,
|
||||
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
|
||||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
@ -53,7 +55,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
@ -77,6 +79,22 @@ public class HiveConfigurator {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}.
|
||||
* @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
|
||||
* @param hiveConfig The Configuration to apply to the acquired UserGroupInformation
|
||||
* @param kerberosUser The KerberosUser to authenticate
|
||||
* @return A UserGroupInformation instance created using the Subject of the given KerberosUser
|
||||
* @throws AuthenticationFailedException if authentication fails
|
||||
*/
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException {
|
||||
try {
|
||||
return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser);
|
||||
} catch (IOException ioe) {
|
||||
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As of Apache NiFi 1.5.0, due to changes made to
|
||||
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||
|
@ -94,7 +112,9 @@ public class HiveConfigurator {
|
|||
* authentication attempts that would leave the Hive controller service in an unrecoverable state.
|
||||
*
|
||||
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||
* @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
|
||||
*/
|
||||
@Deprecated
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException {
|
||||
UserGroupInformation ugi;
|
||||
try {
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
|
@ -115,9 +116,12 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
|
@ -262,7 +266,7 @@ public class TestPutHive3Streaming {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testUgiGetsCleared() throws Exception {
|
||||
public void testUgiAndKerberosUserGetsCleared() throws Exception {
|
||||
configure(processor, 0);
|
||||
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
|
||||
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
|
||||
|
@ -270,6 +274,7 @@ public class TestPutHive3Streaming {
|
|||
processor.ugi = mock(UserGroupInformation.class);
|
||||
runner.run();
|
||||
assertNull(processor.ugi);
|
||||
assertNull(processor.kerberosUserReference.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -281,12 +286,13 @@ public class TestPutHive3Streaming {
|
|||
runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs");
|
||||
runner.enableControllerService(kcs);
|
||||
ugi = mock(UserGroupInformation.class);
|
||||
when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString())).thenReturn(ugi);
|
||||
when(hiveConfigurator.authenticate(eq(hiveConf), any(KerberosUser.class))).thenReturn(ugi);
|
||||
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
|
||||
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
|
||||
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run();
|
||||
verify(hiveConfigurator, times(1)).authenticate(eq(hiveConf), any(KerberosUser.class));
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
|
|
|
@ -31,12 +31,16 @@ import org.apache.nifi.components.ValidationContext;
|
|||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.util.hive.AuthenticationFailedException;
|
||||
import org.apache.nifi.util.hive.HiveConfigurator;
|
||||
import org.apache.nifi.util.hive.HiveUtils;
|
||||
|
@ -57,6 +61,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
|
||||
/**
|
||||
* Implementation for Database Connection Pooling Service used for Apache Hive 1.1
|
||||
* connections. Apache DBCP is used for connection pooling functionality.
|
||||
|
@ -149,6 +155,25 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
|
@ -161,6 +186,7 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
|
||||
private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
private volatile UserGroupInformation ugi;
|
||||
private final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) {
|
||||
|
@ -173,6 +199,8 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
props.add(MAX_TOTAL_CONNECTIONS);
|
||||
props.add(VALIDATION_QUERY);
|
||||
props.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
props.add(KERBEROS_PRINCIPAL);
|
||||
props.add(KERBEROS_PASSWORD);
|
||||
|
||||
properties = props;
|
||||
}
|
||||
|
@ -190,19 +218,29 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
|
||||
if (confFileProvided) {
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String explicitPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = null;
|
||||
resolvedKeytab = null;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = null;
|
||||
}
|
||||
|
||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
|
||||
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitPassword != null)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
return problems;
|
||||
|
@ -260,28 +298,38 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
|
||||
final String drv = HiveDriver.class.getName();
|
||||
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
|
||||
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = null;
|
||||
resolvedKeytab = null;
|
||||
} else {
|
||||
if (credentialsService != null) {
|
||||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
} else {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = null;
|
||||
}
|
||||
|
||||
if (resolvedKeytab != null) {
|
||||
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
} else if (explicitPassword != null) {
|
||||
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
|
||||
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
|
||||
} else {
|
||||
throw new InitializationException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
|
||||
try {
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab);
|
||||
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
|
||||
} catch (AuthenticationFailedException ae) {
|
||||
log.error(ae.getMessage(), ae);
|
||||
throw new InitializationException(ae);
|
||||
}
|
||||
|
||||
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
|
||||
getLogger().info("Successfully logged in as principal " + resolvedPrincipal);
|
||||
}
|
||||
|
||||
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||
|
@ -325,13 +373,24 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
|
|||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
||||
if (ugi != null) {
|
||||
synchronized(this) {
|
||||
/*
|
||||
* Make sure that only one thread can request that the UGI relogin at a time. This
|
||||
* explicit relogin attempt is necessary due to the Hive client/thrift not implicitly handling
|
||||
* the acquisition of a new TGT after the current one has expired.
|
||||
* https://issues.apache.org/jira/browse/NIFI-5134
|
||||
* Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization
|
||||
* is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
|
||||
*/
|
||||
getLogger().trace("getting UGI instance");
|
||||
if (kerberosUserReference.get() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = kerberosUserReference.get();
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
// no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
public class HiveConfigurator {
|
||||
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, String password,
|
||||
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||
|
||||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
@ -50,7 +52,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
@ -74,6 +76,22 @@ public class HiveConfigurator {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a {@link UserGroupInformation} using the given {@link Configuration} and {@link KerberosUser}.
|
||||
* @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
|
||||
* @param hiveConfig The Configuration to apply to the acquired UserGroupInformation
|
||||
* @param kerberosUser The KerberosUser to authenticate
|
||||
* @return A UserGroupInformation instance created using the Subject of the given KerberosUser
|
||||
* @throws AuthenticationFailedException if authentication fails
|
||||
*/
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, KerberosUser kerberosUser) throws AuthenticationFailedException {
|
||||
try {
|
||||
return SecurityUtil.getUgiForKerberosUser(hiveConfig, kerberosUser);
|
||||
} catch (IOException ioe) {
|
||||
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As of Apache NiFi 1.5.0, due to changes made to
|
||||
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||
|
@ -91,7 +109,9 @@ public class HiveConfigurator {
|
|||
* authentication attempts that would leave the Hive controller service in an unrecoverable state.
|
||||
*
|
||||
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||
* @deprecated Use {@link SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
|
||||
*/
|
||||
@Deprecated
|
||||
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException {
|
||||
UserGroupInformation ugi;
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue