mirror of https://github.com/apache/nifi.git
NIFI-7018: Initial commit of processors extending AbstractHadoopProcessor supporting kerberos passwords
AbstractHadoopProcessor will always authenticate the principal with a KerberosUser implementation and a UGI will be acquired from the Subject associated with the KerberosUser implementation AbstractHadoopProcessor's getUserGroupInformation method will now attempt to check the TGT and relogin if a KerberosUser impelmentation is available, otherwise it will return the UGI referenced in the HdfsResource instance Updated AbstractHadoopProcessor's customValidate method to consider the provided password and updated validation failure explanations when a KerberosCredentialsService is specified together with a principal, password, or keytab Added toString method override to AbstractKerberosUser Updated Hive/HBase components to be compatible with the KerberosProperties.validatePrincipalWithKeytabOrPassword method Fixed null ComponentLog in GetHDFSSequenceFileTest Added package-protected accessor method (getAllowExplicitKeytabEnvironmentVariable) to AbstractHadoopProcessor for determining if the environment variable "NIFI_ALLOW_EXPLICIT_KEYTAB" has been set AbstractHadoopProcessor will now only fail validation when the NIFI_ALLOW_EXPLICIT_KEYTAB environment variable is set to false if a keytab is provided to allow the user to specify a principal and password Added AbstractHadoopProcessorSpec to verify validation of principal/keytab/password/kerberos credential service combinations This closes #4095.
This commit is contained in:
parent
23b04ae968
commit
614136ce51
|
@ -70,7 +70,11 @@ public abstract class AbstractKerberosUser implements KerberosUser {
|
|||
// If it's the first time ever calling login then we need to initialize a new context
|
||||
if (loginContext == null) {
|
||||
LOGGER.debug("Initializing new login context...");
|
||||
this.subject = new Subject();
|
||||
if (this.subject == null) {
|
||||
// only create a new subject if a current one does not exist
|
||||
// other classes may be referencing an existing subject and replacing it may break functionality of those other classes after relogin
|
||||
this.subject = new Subject();
|
||||
}
|
||||
this.loginContext = createLoginContext(subject);
|
||||
}
|
||||
|
||||
|
@ -100,7 +104,6 @@ public abstract class AbstractKerberosUser implements KerberosUser {
|
|||
loggedIn.set(false);
|
||||
LOGGER.debug("Successful logout for {}", new Object[]{principal});
|
||||
|
||||
subject = null;
|
||||
loginContext = null;
|
||||
} catch (LoginException e) {
|
||||
throw new LoginException("Logout failed due to: " + e.getMessage());
|
||||
|
@ -240,4 +243,11 @@ public abstract class AbstractKerberosUser implements KerberosUser {
|
|||
return this.subject;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KerberosUser{" +
|
||||
"principal='" + principal + '\'' +
|
||||
", loggedIn=" + loggedIn +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,15 +26,21 @@ import org.mockito.Mockito;
|
|||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.kerberos.KerberosTicket;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.File;
|
||||
import java.security.AccessControlContext;
|
||||
import java.security.AccessController;
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class KerberosUserIT {
|
||||
|
@ -167,6 +173,24 @@ public class KerberosUserIT {
|
|||
}
|
||||
}
|
||||
assertEquals(true, performedRelogin);
|
||||
|
||||
Subject subject = user1.doAs((PrivilegedAction<Subject>) () -> {
|
||||
AccessControlContext context = AccessController.getContext();
|
||||
return Subject.getSubject(context);
|
||||
});
|
||||
|
||||
// verify only a single KerberosTicket exists in the Subject after relogin
|
||||
Set<KerberosTicket> kerberosTickets = subject.getPrivateCredentials(KerberosTicket.class);
|
||||
assertEquals(1, kerberosTickets.size());
|
||||
|
||||
// verify the new ticket lifetime is valid for the current time
|
||||
KerberosTicket kerberosTicket = kerberosTickets.iterator().next();
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
long startMilli = kerberosTicket.getStartTime().toInstant().toEpochMilli();
|
||||
long endMilli = kerberosTicket.getEndTime().toInstant().toEpochMilli();
|
||||
System.out.println("New ticket is valid for " + TimeUnit.MILLISECONDS.toSeconds(endMilli - startMilli) + " seconds");
|
||||
assertTrue(startMilli < currentTimeMillis);
|
||||
assertTrue(endMilli > currentTimeMillis);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -34,6 +34,11 @@
|
|||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
|
||||
|
@ -62,6 +67,22 @@
|
|||
<version>4.5.5</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.spockframework</groupId>
|
||||
<artifactId>spock-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
<version>3.2.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -45,6 +45,7 @@ public class KerberosProperties {
|
|||
private final Validator kerberosConfigValidator;
|
||||
private final PropertyDescriptor kerberosPrincipal;
|
||||
private final PropertyDescriptor kerberosKeytab;
|
||||
private final PropertyDescriptor kerberosPassword;
|
||||
|
||||
/**
|
||||
* Instantiate a KerberosProperties object but keep in mind it is
|
||||
|
@ -90,13 +91,23 @@ public class KerberosProperties {
|
|||
.build();
|
||||
|
||||
this.kerberosKeytab = new PropertyDescriptor.Builder()
|
||||
.name("Kerberos Keytab").required(false)
|
||||
.name("Kerberos Keytab")
|
||||
.required(false)
|
||||
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.addValidator(kerberosConfigValidator)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
this.kerberosPassword = new PropertyDescriptor.Builder()
|
||||
.name("Kerberos Password")
|
||||
.required(false)
|
||||
.description("Kerberos password associated with the principal.")
|
||||
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public File getKerberosConfigFile() {
|
||||
|
@ -115,7 +126,12 @@ public class KerberosProperties {
|
|||
return kerberosKeytab;
|
||||
}
|
||||
|
||||
public static List<ValidationResult> validatePrincipalAndKeytab(final String subject, final Configuration config, final String principal, final String keytab, final ComponentLog logger) {
|
||||
public PropertyDescriptor getKerberosPassword() {
|
||||
return kerberosPassword;
|
||||
}
|
||||
|
||||
public static List<ValidationResult> validatePrincipalWithKeytabOrPassword(final String subject, final Configuration config, final String principal, final String keytab,
|
||||
final String password, final ComponentLog logger) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
// if security is enabled then the keytab and principal are required
|
||||
|
@ -131,11 +147,21 @@ public class KerberosProperties {
|
|||
}
|
||||
|
||||
final boolean blankKeytab = (keytab == null || keytab.isEmpty());
|
||||
if (isSecurityEnabled && blankKeytab) {
|
||||
final boolean blankPassword = (password == null || password.isEmpty());
|
||||
|
||||
if (isSecurityEnabled && blankKeytab && blankPassword) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(subject)
|
||||
.explanation("Kerberos Keytab must be provided when using a secure configuration")
|
||||
.explanation("Kerberos Keytab or Kerberos Password must be provided when using a secure configuration")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (isSecurityEnabled && !blankKeytab && !blankPassword) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(subject)
|
||||
.explanation("Cannot specify both a Kerberos Keytab and a Kerberos Password")
|
||||
.build());
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,18 @@ package org.apache.nifi.hadoop;
|
|||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.IOException;
|
||||
import java.security.AccessControlContext;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
|
||||
|
@ -69,6 +78,29 @@ public class SecurityUtil {
|
|||
return UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
||||
public static synchronized UserGroupInformation getUgiForKerberosUser(final Configuration config, final KerberosUser kerberosUser) throws IOException {
|
||||
UserGroupInformation.setConfiguration(config);
|
||||
try {
|
||||
if (kerberosUser.isLoggedIn()) {
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} else {
|
||||
kerberosUser.login();
|
||||
}
|
||||
return kerberosUser.doAs((PrivilegedExceptionAction<UserGroupInformation>) () -> {
|
||||
AccessControlContext context = AccessController.getContext();
|
||||
Subject subject = Subject.getSubject(context);
|
||||
Validate.notEmpty(
|
||||
subject.getPrincipals(KerberosPrincipal.class).stream().filter(p -> p.getName().startsWith(kerberosUser.getPrincipal())).collect(Collectors.toSet()),
|
||||
"No Subject was found matching the given principal");
|
||||
return UserGroupInformation.getUGIFromSubject(subject);
|
||||
});
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw new IOException("Unable to acquire UGI for KerberosUser: " + e.getException().getLocalizedMessage(), e.getException());
|
||||
} catch (LoginException e) {
|
||||
throw new IOException("Unable to acquire UGI for KerberosUser: " + e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.getLoginUser().
|
||||
* All logins should happen through this class to ensure other threads are not concurrently modifying
|
||||
|
|
|
@ -39,9 +39,14 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
|
@ -100,8 +105,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.defaultValue(CompressionType.NONE.toString())
|
||||
.build();
|
||||
|
||||
/*
|
||||
* TODO This property has been deprecated, remove for NiFi 2.0
|
||||
*/
|
||||
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
|
||||
.name("Kerberos Relogin Period").required(false)
|
||||
.name("Kerberos Relogin Period")
|
||||
.required(false)
|
||||
.description("Period of time which should pass before attempting a kerberos relogin.\n\nThis property has been deprecated, and has no effect on processing. " +
|
||||
"Relogins now occur automatically.")
|
||||
.defaultValue("4 hours")
|
||||
|
@ -130,6 +139,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
|
||||
|
||||
private static final Object RESOURCES_LOCK = new Object();
|
||||
private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);
|
||||
|
||||
protected KerberosProperties kerberosProperties;
|
||||
protected List<PropertyDescriptor> properties;
|
||||
|
@ -144,7 +154,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected void init(ProcessorInitializationContext context) {
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
hdfsResources.set(EMPTY_HDFS_RESOURCES);
|
||||
|
||||
kerberosConfigFile = context.getKerberosConfigurationFile();
|
||||
kerberosProperties = getKerberosProperties(kerberosConfigFile);
|
||||
|
@ -154,6 +164,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
props.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
props.add(kerberosProperties.getKerberosPrincipal());
|
||||
props.add(kerberosProperties.getKerberosKeytab());
|
||||
props.add(kerberosProperties.getKerberosPassword());
|
||||
props.add(KERBEROS_RELOGIN_PERIOD);
|
||||
props.add(ADDITIONAL_CLASSPATH_RESOURCES);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
|
@ -173,10 +184,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
|
||||
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
final String resolvedPrincipal;
|
||||
final String resolvedKeytab;
|
||||
final String resolvedPassword;
|
||||
if (credentialsService == null) {
|
||||
resolvedPrincipal = explicitPrincipal;
|
||||
resolvedKeytab = explicitKeytab;
|
||||
|
@ -184,6 +197,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
resolvedPrincipal = credentialsService.getPrincipal();
|
||||
resolvedKeytab = credentialsService.getKeytab();
|
||||
}
|
||||
resolvedPassword = explicitPassword;
|
||||
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
|
@ -205,8 +219,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
final Configuration conf = resources.getConfiguration();
|
||||
results.addAll(KerberosProperties.validatePrincipalAndKeytab(
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, getLogger()));
|
||||
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, resolvedPassword, getLogger()));
|
||||
|
||||
} catch (final IOException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
|
@ -216,20 +230,20 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.build());
|
||||
}
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
|
||||
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
final String allowExplicitKeytabVariable = getAllowExplicitKeytabEnvironmentVariable();
|
||||
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && explicitKeytab != null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("Kerberos Credentials")
|
||||
.valid(false)
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
|
||||
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
|
||||
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
|
||||
.build());
|
||||
}
|
||||
|
@ -253,7 +267,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
hdfsResources.set(EMPTY_HDFS_RESOURCES);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
@ -294,7 +308,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
// Clear out the reference to the resources
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
hdfsResources.set(EMPTY_HDFS_RESOURCES);
|
||||
}
|
||||
|
||||
private void interruptStatisticsThread(final FileSystem fileSystem) throws NoSuchFieldException, IllegalAccessException {
|
||||
|
@ -371,10 +385,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
|
||||
FileSystem fs;
|
||||
UserGroupInformation ugi;
|
||||
KerberosUser kerberosUser;
|
||||
synchronized (RESOURCES_LOCK) {
|
||||
if (SecurityUtil.isSecurityEnabled(config)) {
|
||||
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
String password = context.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
|
||||
|
||||
// If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
|
||||
// The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
|
||||
|
@ -384,22 +400,29 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
keyTab = credentialsService.getKeytab();
|
||||
}
|
||||
|
||||
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
|
||||
fs = getFileSystemAsUser(config, ugi);
|
||||
if (keyTab != null) {
|
||||
kerberosUser = new KerberosKeytabUser(principal, keyTab);
|
||||
} else if (password != null) {
|
||||
kerberosUser = new KerberosPasswordUser(principal, password);
|
||||
} else {
|
||||
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
|
||||
}
|
||||
ugi = SecurityUtil.getUgiForKerberosUser(config, kerberosUser);
|
||||
} else {
|
||||
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
|
||||
config.set("hadoop.security.authentication", "simple");
|
||||
ugi = SecurityUtil.loginSimple(config);
|
||||
fs = getFileSystemAsUser(config, ugi);
|
||||
kerberosUser = null;
|
||||
}
|
||||
fs = getFileSystemAsUser(config, ugi);
|
||||
}
|
||||
getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
|
||||
getLogger().debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
|
||||
|
||||
final Path workingDir = fs.getWorkingDirectory();
|
||||
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
||||
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
|
||||
|
||||
return new HdfsResources(config, fs, ugi);
|
||||
return new HdfsResources(config, fs, ugi, kerberosUser);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -518,18 +541,43 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
protected UserGroupInformation getUserGroupInformation() {
|
||||
getLogger().trace("getting UGI instance");
|
||||
if (hdfsResources.get().getKerberosUser() != null) {
|
||||
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
|
||||
KerberosUser kerberosUser = hdfsResources.get().getKerberosUser();
|
||||
synchronized (kerberosUser) {
|
||||
getLogger().debug("kerberosUser is " + kerberosUser);
|
||||
try {
|
||||
getLogger().debug("checking TGT on kerberosUser [{}]" + kerberosUser);
|
||||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
}
|
||||
return hdfsResources.get().getUserGroupInformation();
|
||||
}
|
||||
|
||||
/*
|
||||
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
|
||||
*/
|
||||
String getAllowExplicitKeytabEnvironmentVariable() {
|
||||
return System.getenv(ALLOW_EXPLICIT_KEYTAB);
|
||||
}
|
||||
|
||||
static protected class HdfsResources {
|
||||
private final Configuration configuration;
|
||||
private final FileSystem fileSystem;
|
||||
private final UserGroupInformation userGroupInformation;
|
||||
private final KerberosUser kerberosUser;
|
||||
|
||||
public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
|
||||
public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation, KerberosUser kerberosUser) {
|
||||
this.configuration = configuration;
|
||||
this.fileSystem = fileSystem;
|
||||
this.userGroupInformation = userGroupInformation;
|
||||
this.kerberosUser = kerberosUser;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
|
@ -543,6 +591,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
public UserGroupInformation getUserGroupInformation() {
|
||||
return userGroupInformation;
|
||||
}
|
||||
|
||||
public KerberosUser getKerberosUser() {
|
||||
return kerberosUser;
|
||||
}
|
||||
}
|
||||
|
||||
static protected class ValidationResources {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.hadoop
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
|
||||
import org.apache.hadoop.minikdc.MiniKdc
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Specification
|
||||
|
||||
class SecurityUtilITSpec extends Specification {
|
||||
|
||||
@Shared
|
||||
private static Logger LOGGER
|
||||
@Shared
|
||||
private MiniKdc miniKdc
|
||||
@Shared
|
||||
private Configuration configuration
|
||||
|
||||
def setupSpec() {
|
||||
LOGGER = LoggerFactory.getLogger SecurityUtilITSpec
|
||||
configuration = new Configuration()
|
||||
configuration.set CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"
|
||||
configuration.setBoolean CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true
|
||||
configuration.setInt CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 5
|
||||
def miniKdcConf = MiniKdc.createConf()
|
||||
miniKdcConf.setProperty MiniKdc.INSTANCE, InetAddress.localHost.hostName
|
||||
miniKdcConf.setProperty MiniKdc.ORG_NAME, "EXAMPLE"
|
||||
miniKdcConf.setProperty MiniKdc.ORG_DOMAIN, "COM"
|
||||
miniKdcConf.setProperty MiniKdc.DEBUG, "false"
|
||||
miniKdcConf.setProperty MiniKdc.MAX_TICKET_LIFETIME, "5"
|
||||
miniKdc = new MiniKdc(miniKdcConf, new File("./target/minikdc"))
|
||||
miniKdc.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
miniKdc.stop()
|
||||
}
|
||||
|
||||
def "getUgiForKerberosUser with unauthenticated KerberosPasswordUser returns correct UGI"() {
|
||||
given:
|
||||
def principal = "testprincipal2"
|
||||
def password = "password"
|
||||
miniKdc.createPrincipal principal, password
|
||||
|
||||
when: "A KerberosPasswordUser is created for the given principal"
|
||||
def kerberosPasswordUser = new KerberosPasswordUser(principal, password)
|
||||
|
||||
then: "The created KerberosPasswordUser is not logged in"
|
||||
!kerberosPasswordUser.isLoggedIn()
|
||||
|
||||
when: "A UGI is acquired for the KerberosPasswordUser"
|
||||
def ugi = SecurityUtil.getUgiForKerberosUser configuration, kerberosPasswordUser
|
||||
|
||||
then: "The KerberosPasswordUser is logged in and the acquired UGI is valid for the given principal"
|
||||
kerberosPasswordUser.isLoggedIn()
|
||||
ugi != null
|
||||
ugi.getShortUserName() == principal
|
||||
LOGGER.debug "UGI = [{}], KerberosUser = [{}]", ugi, kerberosPasswordUser
|
||||
}
|
||||
|
||||
def "getUgiForKerberosUser with authenticated KerberosPasswordUser returns correct UGI"() {
|
||||
given:
|
||||
def principal = "testprincipal3"
|
||||
def password = "password"
|
||||
miniKdc.createPrincipal principal, password
|
||||
|
||||
when: "A KerberosPasswordUser is created for the given principal and authenticated"
|
||||
def kerberosPasswordUser = new KerberosPasswordUser(principal, password)
|
||||
kerberosPasswordUser.login()
|
||||
|
||||
then: "The created KerberosPasswordUser is logged in"
|
||||
kerberosPasswordUser.isLoggedIn()
|
||||
|
||||
when: "A UGI is acquired for the KerberosPasswordUser"
|
||||
def ugi = SecurityUtil.getUgiForKerberosUser configuration, kerberosPasswordUser
|
||||
|
||||
then: "The KerberosPasswordUser is logged in and the acquired UGI is valid for the given principal"
|
||||
kerberosPasswordUser.isLoggedIn()
|
||||
ugi != null
|
||||
ugi.getShortUserName() == principal
|
||||
LOGGER.debug "UGI = [{}], KerberosUser = [{}]", ugi, kerberosPasswordUser
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop
|
||||
|
||||
import org.apache.nifi.components.PropertyValue
|
||||
import org.apache.nifi.components.ValidationContext
|
||||
import org.apache.nifi.hadoop.KerberosProperties
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService
|
||||
import org.apache.nifi.processor.ProcessContext
|
||||
import org.apache.nifi.processor.ProcessSession
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext
|
||||
import org.apache.nifi.processor.exception.ProcessException
|
||||
import org.apache.nifi.util.MockComponentLog
|
||||
import org.apache.nifi.util.MockPropertyValue
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
||||
class AbstractHadoopProcessorSpec extends Specification {
|
||||
private static Logger LOGGER = LoggerFactory.getLogger AbstractHadoopProcessorSpec
|
||||
|
||||
@Unroll
|
||||
def "customValidate for #testName"() {
|
||||
given:
|
||||
def testAbstractHadoopProcessor = new TestAbstractHadoopProcessor(allowExplicitKeytab);
|
||||
testAbstractHadoopProcessor.kerberosProperties = new KerberosProperties(new File('src/test/resources/krb5.conf'))
|
||||
def mockProcessorInitializationContext = Mock ProcessorInitializationContext
|
||||
def mockValidationContext = Mock ValidationContext
|
||||
def mockHadoopConfigurationResourcesPropertyValue = Mock PropertyValue
|
||||
def mockKerberosCredentialsServicePropertyValue = Mock PropertyValue
|
||||
def mockKerberosCredentialsServiceControllerService = Mock KerberosCredentialsService
|
||||
|
||||
when:
|
||||
testAbstractHadoopProcessor.initialize(mockProcessorInitializationContext)
|
||||
def validationResults = testAbstractHadoopProcessor.customValidate(mockValidationContext)
|
||||
|
||||
then:
|
||||
1 * mockProcessorInitializationContext.getLogger() >> new MockComponentLog("AbstractHadoopProcessorSpec", testAbstractHadoopProcessor)
|
||||
1 * mockValidationContext.getProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES) >> mockHadoopConfigurationResourcesPropertyValue
|
||||
1 * mockHadoopConfigurationResourcesPropertyValue.evaluateAttributeExpressions() >> mockHadoopConfigurationResourcesPropertyValue
|
||||
1 * mockHadoopConfigurationResourcesPropertyValue.getValue() >> "src/test/resources/test-secure-core-site.xml"
|
||||
|
||||
1 * mockValidationContext.getProperty(AbstractHadoopProcessor.KERBEROS_CREDENTIALS_SERVICE) >> mockKerberosCredentialsServicePropertyValue
|
||||
if (configuredKeytabCredentialsService) {
|
||||
1 * mockKerberosCredentialsServicePropertyValue.asControllerService(KerberosCredentialsService.class) >> mockKerberosCredentialsServiceControllerService
|
||||
1 * mockKerberosCredentialsServiceControllerService.principal >> configuredKeytabCredentialsServicePrincipal
|
||||
1 * mockKerberosCredentialsServiceControllerService.keytab >> configuredKeytabCredentialsServiceKeytab
|
||||
}
|
||||
|
||||
1 * mockValidationContext.getProperty(testAbstractHadoopProcessor.kerberosProperties.kerberosPrincipal) >> new MockPropertyValue(configuredPrincipal)
|
||||
1 * mockValidationContext.getProperty(testAbstractHadoopProcessor.kerberosProperties.kerberosPassword) >> new MockPropertyValue(configuredPassword)
|
||||
1 * mockValidationContext.getProperty(testAbstractHadoopProcessor.kerberosProperties.kerberosKeytab) >> new MockPropertyValue(configuredKeytab)
|
||||
|
||||
then:
|
||||
def actualValidationErrors = validationResults.each { !it.isValid() }
|
||||
if (actualValidationErrors.size() > 0) {
|
||||
actualValidationErrors.each { LOGGER.debug(it.toString()) }
|
||||
}
|
||||
actualValidationErrors.size() == expectedValidationErrorCount
|
||||
|
||||
where:
|
||||
testName | configuredPrincipal | configuredKeytab | configuredPassword | allowExplicitKeytab | configuredKeytabCredentialsService | configuredKeytabCredentialsServicePrincipal | configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount
|
||||
"success case 1" | "principal" | "keytab" | null | "true" | false | null | null || 0
|
||||
"success case 2" | "principal" | null | "password" | "true" | false | null | null || 0
|
||||
"success case 3" | "principal" | null | "password" | "false" | false | null | null || 0
|
||||
"success case 4" | null | null | null | "true" | true | "principal" | "keytab" || 0
|
||||
"success case 5" | null | null | null | "false" | true | "principal" | "keytab" || 0
|
||||
// do not allow explicit keytab, but provide one anyway; validation fails
|
||||
"failure case 1" | "principal" | "keytab" | null | "false" | false | null | null || 1
|
||||
"failure case 2" | null | "keytab" | null | "false" | false | null | null || 2
|
||||
// keytab credentials service is provided, but explicit properties for principal, password, or keytab are also provided; validation fails
|
||||
"failure case 3" | "principal" | null | null | "true" | true | "principal" | "keytab" || 1
|
||||
"failure case 4" | null | "keytab" | null | "true" | true | "principal" | "keytab" || 1
|
||||
"failure case 5" | null | null | "password" | "true" | true | "principal" | "keytab" || 2
|
||||
"failure case 6" | "principal" | null | null | "false" | true | "principal" | "keytab" || 1
|
||||
"failure case 7" | null | "keytab" | null | "false" | true | "principal" | "keytab" || 2
|
||||
"failure case 8" | null | null | "password" | "false" | true | "principal" | "keytab" || 2
|
||||
}
|
||||
|
||||
private class TestAbstractHadoopProcessor extends AbstractHadoopProcessor {
|
||||
def allowExplicitKeytab = false
|
||||
|
||||
TestAbstractHadoopProcessor(def allowExplicitKeytab) {
|
||||
this.allowExplicitKeytab = allowExplicitKeytab
|
||||
}
|
||||
|
||||
@Override
|
||||
void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
throw new NoSuchMethodError("not intended to be invoked by the test, this implementation is only intended for custom validation purposes")
|
||||
}
|
||||
|
||||
@Override
|
||||
String getAllowExplicitKeytabEnvironmentVariable() {
|
||||
allowExplicitKeytab
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -65,26 +65,70 @@ public class TestKerberosProperties {
|
|||
final ComponentLog log = Mockito.mock(ComponentLog.class);
|
||||
final Configuration config = new Configuration();
|
||||
|
||||
// no security enabled in config so doesn't matter what principal and keytab are
|
||||
List<ValidationResult> results = KerberosProperties.validatePrincipalAndKeytab(
|
||||
"test", config, null, null, log);
|
||||
// no security enabled in config so doesn't matter what principal, keytab, and password are
|
||||
List<ValidationResult> results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, null, null, null, log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
results = KerberosProperties.validatePrincipalAndKeytab(
|
||||
"test", config, "principal", null, log);
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", null, null, log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
results = KerberosProperties.validatePrincipalAndKeytab(
|
||||
"test", config, "principal", "keytab", log);
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", "keytab", null, log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", null, "password", log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", "keytab", "password", log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
// change the config to have kerberos turned on
|
||||
config.set("hadoop.security.authentication", "kerberos");
|
||||
config.set("hadoop.security.authorization", "true");
|
||||
|
||||
results = KerberosProperties.validatePrincipalAndKeytab(
|
||||
"test", config, null, null, log);
|
||||
// security is enabled, no principal, keytab, or password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, null, null, null, log);
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
// security is enabled, keytab provided, no principal or password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, null, "keytab", null, log);
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
// security is enabled, password provided, no principal or keytab provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, null, null, "password", log);
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
// security is enabled, no principal provided, keytab and password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, null, "keytab", "password", log);
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
// security is enabled, principal provided, no keytab or password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", null, null, log);
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
// security is enabled, principal and keytab provided, no password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", "keytab", null, log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
// security is enabled, no keytab provided, principal and password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", null, "password", log);
|
||||
Assert.assertEquals(0, results.size());
|
||||
|
||||
// security is enabled, principal, keytab, and password provided
|
||||
results = KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
"test", config, "principal", "keytab", "password", log);
|
||||
Assert.assertEquals(1, results.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Root logger option
|
||||
log4j.rootLogger=WARN, stdout
|
||||
|
||||
log4j.logger.org.apache.nifi=INFO
|
||||
log4j.logger.org.apache.hadoop.security=INFO
|
||||
log4j.logger.org.apache.hadoop.minikdc=WARN
|
||||
log4j.logger.org.apache.nifi.hadoop.SecurityUtilITSpec=DEBUG
|
||||
log4j.logger.org.apache.nifi.processors.hadoop.AbstractHadoopProcessorSpec=DEBUG
|
||||
|
||||
# log messages to console
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.Target=System.out
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d %-5p [%t] %40.40c - %m%n
|
|
@ -0,0 +1,21 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hadoop.security.authentication</name>
|
||||
<value>kerberos</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -25,6 +25,7 @@ import org.apache.nifi.hadoop.KerberosProperties;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -38,6 +39,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class GetHDFSSequenceFileTest {
|
||||
private AbstractHadoopProcessor.HdfsResources hdfsResources;
|
||||
|
@ -52,7 +54,7 @@ public class GetHDFSSequenceFileTest {
|
|||
configuration = mock(Configuration.class);
|
||||
fileSystem = mock(FileSystem.class);
|
||||
userGroupInformation = mock(UserGroupInformation.class);
|
||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
|
||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation, null);
|
||||
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
|
||||
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
|
||||
reloginTried = false;
|
||||
|
@ -61,7 +63,10 @@ public class GetHDFSSequenceFileTest {
|
|||
|
||||
private void init() throws IOException {
|
||||
final MockProcessContext context = new MockProcessContext(getHDFSSequenceFile);
|
||||
getHDFSSequenceFile.init(mock(ProcessorInitializationContext.class));
|
||||
ProcessorInitializationContext mockProcessorInitializationContext = mock(ProcessorInitializationContext.class);
|
||||
when(mockProcessorInitializationContext.getLogger()).thenReturn(new MockComponentLog("GetHDFSSequenceFileTest", getHDFSSequenceFile ));
|
||||
getHDFSSequenceFile.initialize(mockProcessorInitializationContext);
|
||||
getHDFSSequenceFile.init(mockProcessorInitializationContext);
|
||||
getHDFSSequenceFile.onScheduled(context);
|
||||
}
|
||||
|
||||
|
@ -80,7 +85,7 @@ public class GetHDFSSequenceFileTest {
|
|||
|
||||
@Test
|
||||
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
|
||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
|
||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null, null);
|
||||
init();
|
||||
SequenceFileReader reader = mock(SequenceFileReader.class);
|
||||
Path file = mock(Path.class);
|
||||
|
|
|
@ -50,7 +50,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public class HiveConfigurator {
|
|||
|
||||
final Configuration hiveConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, log));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, null, log));
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
|
|
@ -261,7 +261,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
|
||||
final Configuration hbaseConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, getLogger()));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, null, getLogger()));
|
||||
}
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
|
|
|
@ -260,7 +260,7 @@ public class HBase_2_ClientService extends AbstractControllerService implements
|
|||
|
||||
final Configuration hbaseConfig = resources.getConfiguration();
|
||||
|
||||
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, getLogger()));
|
||||
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, null, getLogger()));
|
||||
}
|
||||
|
||||
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
|
||||
|
|
Loading…
Reference in New Issue