mirror of https://github.com/apache/nifi.git
NIFI-4323 This closes #2360. Wrapped Get/ListHDFS hadoop operations in ugi.doAs calls
NIFI-3472 NIFI-4350 Removed explicit relogin code from HDFS/Hive/HBase components and updated SecurityUtils.loginKerberos to use UGI.loginUserFromKeytab. This brings those components in line with daemon-process-style usage, made possible by NiFi's InstanceClassloader isolation. Relogin (on ticket expiry/connection failure) can now be properly handled by hadoop-client code implicitly. NIFI-3472 Added default value (true) for javax.security.auth.useSubjectCredsOnly to bootstrap.conf NIFI-3472 Added javadoc explaining the removal of explicit relogin threads and usage of UGI.loginUserFromKeytab Readded Relogin Period property to AbstractHadoopProcessor, and updated its documentation to indicate that it is now a deprecated property Additional cleanup of code that referenced relogin periods Marked KerberosTicketRenewer is deprecated NIFI-3472 Cleaned up imports in TestPutHiveStreaming
This commit is contained in:
parent
7467bb7b0f
commit
42a1ee011b
|
@ -30,6 +30,7 @@ import java.security.PrivilegedExceptionAction;
|
||||||
* relogin attempts this thread will sleep for the provided amount of time.
|
* relogin attempts this thread will sleep for the provided amount of time.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public class KerberosTicketRenewer implements Runnable {
|
public class KerberosTicketRenewer implements Runnable {
|
||||||
|
|
||||||
private final UserGroupInformation ugi;
|
private final UserGroupInformation ugi;
|
||||||
|
|
|
@ -19,9 +19,9 @@ package org.apache.nifi.hadoop;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
|
* Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
|
||||||
|
@ -35,6 +35,20 @@ public class SecurityUtil {
|
||||||
* Initializes UserGroupInformation with the given Configuration and performs the login for the given principal
|
* Initializes UserGroupInformation with the given Configuration and performs the login for the given principal
|
||||||
* and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying
|
* and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying
|
||||||
* UserGroupInformation.
|
* UserGroupInformation.
|
||||||
|
* <p/>
|
||||||
|
* As of Apache NiFi 1.5.0, this method uses {@link UserGroupInformation#loginUserFromKeytab(String, String)} to
|
||||||
|
* authenticate the given <code>principal</code>, which sets the static variable <code>loginUser</code> in the
|
||||||
|
* {@link UserGroupInformation} instance. Setting <code>loginUser</code> is necessary for
|
||||||
|
* {@link org.apache.hadoop.ipc.Client.Connection#handleSaslConnectionFailure(int, int, Exception, Random, UserGroupInformation)}
|
||||||
|
* to be able to attempt a relogin during a connection failure. The <code>handleSaslConnectionFailure</code> method
|
||||||
|
* calls <code>UserGroupInformation.getLoginUser().reloginFromKeytab()</code> statically, which can return null
|
||||||
|
* if <code>loginUser</code> is not set, resulting in failure of the hadoop operation.
|
||||||
|
* <p/>
|
||||||
|
* In previous versions of NiFi, {@link UserGroupInformation#loginUserFromKeytabAndReturnUGI(String, String)} was
|
||||||
|
* used to authenticate the <code>principal</code>, which does not set <code>loginUser</code>, making it impossible
|
||||||
|
* for a
|
||||||
|
* {@link org.apache.hadoop.ipc.Client.Connection#handleSaslConnectionFailure(int, int, Exception, Random, UserGroupInformation)}
|
||||||
|
* to be able to implicitly relogin the principal.
|
||||||
*
|
*
|
||||||
* @param config the configuration instance
|
* @param config the configuration instance
|
||||||
* @param principal the principal to authenticate as
|
* @param principal the principal to authenticate as
|
||||||
|
@ -51,7 +65,8 @@ public class SecurityUtil {
|
||||||
Validate.notNull(keyTab);
|
Validate.notNull(keyTab);
|
||||||
|
|
||||||
UserGroupInformation.setConfiguration(config);
|
UserGroupInformation.setConfiguration(config);
|
||||||
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim());
|
UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
|
||||||
|
return UserGroupInformation.getCurrentUser();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,32 +100,4 @@ public class SecurityUtil {
|
||||||
Validate.notNull(config);
|
Validate.notNull(config);
|
||||||
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
|
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a thread that periodically attempts to renew the current Kerberos user's ticket.
|
|
||||||
*
|
|
||||||
* Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread.
|
|
||||||
*
|
|
||||||
* @param id
|
|
||||||
* The unique identifier to use for the thread, can be the class name that started the thread
|
|
||||||
* (i.e. PutHDFS, etc)
|
|
||||||
* @param ugi
|
|
||||||
* The current Kerberos user.
|
|
||||||
* @param renewalPeriod
|
|
||||||
* The amount of time between attempting renewals.
|
|
||||||
* @param logger
|
|
||||||
* The logger to use with in the renewer
|
|
||||||
*
|
|
||||||
* @return the KerberosTicketRenewer Runnable
|
|
||||||
*/
|
|
||||||
public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) {
|
|
||||||
final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger);
|
|
||||||
|
|
||||||
final Thread t = new Thread(renewer);
|
|
||||||
t.setName("Kerberos Ticket Renewal [" + id + "]");
|
|
||||||
t.start();
|
|
||||||
|
|
||||||
return renewer;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
|
@ -37,7 +36,6 @@ import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
@ -54,11 +52,18 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.WeakHashMap;
|
import java.util.WeakHashMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a base class that is helpful when building processors interacting with HDFS.
|
* This is a base class that is helpful when building processors interacting with HDFS.
|
||||||
|
* <p/>
|
||||||
|
* As of Apache NiFi 1.5.0, the Relogin Period property is no longer used in the configuration of a Hadoop processor.
|
||||||
|
* Due to changes made to {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||||
|
* class to authenticate a principal with Kerberos, Hadoop components no longer
|
||||||
|
* attempt relogins explicitly. For more information, please read the documentation for
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
|
||||||
|
*
|
||||||
|
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||||
*/
|
*/
|
||||||
@RequiresInstanceClassLoading(cloneAncestorResources = true)
|
@RequiresInstanceClassLoading(cloneAncestorResources = true)
|
||||||
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
|
@ -91,7 +96,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
|
||||||
.name("Kerberos Relogin Period").required(false)
|
.name("Kerberos Relogin Period").required(false)
|
||||||
.description("Period of time which should pass before attempting a kerberos relogin")
|
.description("Period of time which should pass before attempting a kerberos relogin.\n\nThis property has been deprecated, and has no effect on processing. Relogins"
|
||||||
|
+ "now occur automatically.")
|
||||||
.defaultValue("4 hours")
|
.defaultValue("4 hours")
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
@ -111,8 +117,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
private static final Object RESOURCES_LOCK = new Object();
|
private static final Object RESOURCES_LOCK = new Object();
|
||||||
|
|
||||||
private long kerberosReloginThreshold;
|
|
||||||
private long lastKerberosReloginTime;
|
|
||||||
protected KerberosProperties kerberosProperties;
|
protected KerberosProperties kerberosProperties;
|
||||||
protected List<PropertyDescriptor> properties;
|
protected List<PropertyDescriptor> properties;
|
||||||
private volatile File kerberosConfigFile = null;
|
private volatile File kerberosConfigFile = null;
|
||||||
|
@ -195,10 +199,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
// This value will be null when called from ListHDFS, because it overrides all of the default
|
// This value will be null when called from ListHDFS, because it overrides all of the default
|
||||||
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
|
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
|
||||||
PropertyValue reloginPeriod = context.getProperty(KERBEROS_RELOGIN_PERIOD).evaluateAttributeExpressions();
|
|
||||||
if (reloginPeriod.getValue() != null) {
|
|
||||||
kerberosReloginThreshold = reloginPeriod.asTimePeriod(TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
HdfsResources resources = hdfsResources.get();
|
HdfsResources resources = hdfsResources.get();
|
||||||
if (resources.getConfiguration() == null) {
|
if (resources.getConfiguration() == null) {
|
||||||
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||||
|
@ -274,7 +274,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||||
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
|
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
|
||||||
fs = getFileSystemAsUser(config, ugi);
|
fs = getFileSystemAsUser(config, ugi);
|
||||||
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
|
|
||||||
} else {
|
} else {
|
||||||
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
|
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
|
||||||
config.set("hadoop.security.authentication", "simple");
|
config.set("hadoop.security.authentication", "simple");
|
||||||
|
@ -403,44 +402,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FileSystem getFileSystem() {
|
protected FileSystem getFileSystem() {
|
||||||
// trigger Relogin if necessary
|
|
||||||
getUserGroupInformation();
|
|
||||||
return hdfsResources.get().getFileSystem();
|
return hdfsResources.get().getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected UserGroupInformation getUserGroupInformation() {
|
protected UserGroupInformation getUserGroupInformation() {
|
||||||
// if kerberos is enabled, check if the ticket should be renewed before returning
|
return hdfsResources.get().getUserGroupInformation();
|
||||||
UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
|
|
||||||
if (userGroupInformation != null && isTicketOld()) {
|
|
||||||
tryKerberosRelogin(userGroupInformation);
|
|
||||||
}
|
|
||||||
return userGroupInformation;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void tryKerberosRelogin(UserGroupInformation ugi) {
|
|
||||||
try {
|
|
||||||
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
|
|
||||||
"attempting to renew ticket for user {}", new Object[]{
|
|
||||||
kerberosReloginThreshold, ugi.getUserName()});
|
|
||||||
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
|
|
||||||
ugi.checkTGTAndReloginFromKeytab();
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
|
|
||||||
getLogger().info("Kerberos relogin successful or ticket still valid");
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Most likely case of this happening is ticket is expired and error getting a new one,
|
|
||||||
// meaning dfs operations would fail
|
|
||||||
getLogger().error("Kerberos relogin failed", e);
|
|
||||||
throw new ProcessException("Unable to renew kerberos ticket", e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
getLogger().error("Interrupted while attempting Kerberos relogin", e);
|
|
||||||
throw new ProcessException("Unable to renew kerberos ticket", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean isTicketOld() {
|
|
||||||
return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static protected class HdfsResources {
|
static protected class HdfsResources {
|
||||||
|
|
|
@ -57,6 +57,10 @@ nifi.bootstrap.sensitive.key=
|
||||||
# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
|
# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
|
||||||
java.arg.15=-Djava.security.egd=file:/dev/urandom
|
java.arg.15=-Djava.security.egd=file:/dev/urandom
|
||||||
|
|
||||||
|
# Requires JAAS to use only the provided JAAS configuration to authenticate a Subject, without using any "fallback" methods (such as prompting for username/password)
|
||||||
|
# Please see https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/single-signon.html, section "EXCEPTIONS TO THE MODEL"
|
||||||
|
java.arg.16=-Djavax.security.auth.useSubjectCredsOnly=true
|
||||||
|
|
||||||
###
|
###
|
||||||
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
|
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
|
||||||
###
|
###
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -50,6 +51,7 @@ import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -295,6 +297,11 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
context.yield();
|
context.yield();
|
||||||
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
|
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
|
||||||
return;
|
return;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
context.yield();
|
||||||
|
getLogger().warn("Interrupted while retrieving files", e);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,13 +349,13 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
|
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
|
||||||
for (final Path file : files) {
|
for (final Path file : files) {
|
||||||
try {
|
try {
|
||||||
if (!hdfs.exists(file)) {
|
if (!getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(file))) {
|
||||||
continue; // if file is no longer there then move on
|
continue; // if file is no longer there then move on
|
||||||
}
|
}
|
||||||
final String originalFilename = file.getName();
|
final String originalFilename = file.getName();
|
||||||
final String relativePath = getPathDifference(rootDir, file);
|
final String relativePath = getPathDifference(rootDir, file);
|
||||||
|
|
||||||
stream = hdfs.open(file, bufferSize);
|
stream = getUserGroupInformation().doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> hdfs.open(file, bufferSize));
|
||||||
|
|
||||||
final String outputFilename;
|
final String outputFilename;
|
||||||
// Check if we should infer compression codec
|
// Check if we should infer compression codec
|
||||||
|
@ -374,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
|
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
|
||||||
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
|
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
|
||||||
|
|
||||||
if (!keepSourceFiles && !hdfs.delete(file, false)) {
|
if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.delete(file, false))) {
|
||||||
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
|
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
|
||||||
new Object[]{file});
|
new Object[]{file});
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
|
@ -406,7 +413,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
* @return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters
|
* @return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters
|
||||||
* @throws java.io.IOException ex
|
* @throws java.io.IOException ex
|
||||||
*/
|
*/
|
||||||
protected Set<Path> performListing(final ProcessContext context) throws IOException {
|
protected Set<Path> performListing(final ProcessContext context) throws IOException, InterruptedException {
|
||||||
|
|
||||||
final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
|
final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
|
final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
|
||||||
|
@ -435,7 +442,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
* @return files to process
|
* @return files to process
|
||||||
* @throws java.io.IOException ex
|
* @throws java.io.IOException ex
|
||||||
*/
|
*/
|
||||||
protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException {
|
protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException, InterruptedException {
|
||||||
if (null == filesVisited) {
|
if (null == filesVisited) {
|
||||||
filesVisited = new HashSet<>();
|
filesVisited = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
@ -446,7 +453,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
final Set<Path> files = new HashSet<>();
|
final Set<Path> files = new HashSet<>();
|
||||||
|
|
||||||
for (final FileStatus file : hdfs.listStatus(dir)) {
|
FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));
|
||||||
|
for (final FileStatus file : fileStatuses) {
|
||||||
if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
|
if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
|
||||||
// no need to make the files set larger than what we would queue anyway
|
// no need to make the files set larger than what we would queue anyway
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -343,6 +344,10 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
} catch (final IOException | IllegalArgumentException e) {
|
} catch (final IOException | IllegalArgumentException e) {
|
||||||
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
|
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
|
||||||
return;
|
return;
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
getLogger().error("Interrupted while performing listing of HDFS", e);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<FileStatus> listable = determineListable(statuses, context);
|
final Set<FileStatus> listable = determineListable(statuses, context);
|
||||||
|
@ -381,11 +386,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
|
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException {
|
||||||
final Set<FileStatus> statusSet = new HashSet<>();
|
final Set<FileStatus> statusSet = new HashSet<>();
|
||||||
|
|
||||||
getLogger().debug("Fetching listing for {}", new Object[] {path});
|
getLogger().debug("Fetching listing for {}", new Object[] {path});
|
||||||
final FileStatus[] statuses = hdfs.listStatus(path, filter);
|
final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
|
||||||
|
|
||||||
for ( final FileStatus status : statuses ) {
|
for ( final FileStatus status : statuses ) {
|
||||||
if ( status.isDirectory() ) {
|
if ( status.isDirectory() ) {
|
||||||
|
|
|
@ -144,13 +144,11 @@ public class AbstractHadoopTest {
|
||||||
// initialize the runner with EL for the kerberos properties
|
// initialize the runner with EL for the kerberos properties
|
||||||
runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "${variableHadoopConfigResources}");
|
runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "${variableHadoopConfigResources}");
|
||||||
runner.setProperty(kerberosProperties.getKerberosPrincipal(), "${variablePrincipal}");
|
runner.setProperty(kerberosProperties.getKerberosPrincipal(), "${variablePrincipal}");
|
||||||
runner.setProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD, "${variableReloginPeriod}");
|
|
||||||
runner.setProperty(kerberosProperties.getKerberosKeytab(), "${variableKeytab}");
|
runner.setProperty(kerberosProperties.getKerberosKeytab(), "${variableKeytab}");
|
||||||
|
|
||||||
// add variables for all the kerberos properties except for the keytab
|
// add variables for all the kerberos properties except for the keytab
|
||||||
runner.setVariable("variableHadoopConfigResources", "src/test/resources/core-site-security.xml");
|
runner.setVariable("variableHadoopConfigResources", "src/test/resources/core-site-security.xml");
|
||||||
runner.setVariable("variablePrincipal", "principal");
|
runner.setVariable("variablePrincipal", "principal");
|
||||||
runner.setVariable("variableReloginPeriod", "4m");
|
|
||||||
// test that the config is not valid, since the EL for keytab will return nothing, no keytab
|
// test that the config is not valid, since the EL for keytab will return nothing, no keytab
|
||||||
runner.assertNotValid();
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
@ -46,7 +45,6 @@ public class GetHDFSSequenceFileTest {
|
||||||
private Configuration configuration;
|
private Configuration configuration;
|
||||||
private FileSystem fileSystem;
|
private FileSystem fileSystem;
|
||||||
private UserGroupInformation userGroupInformation;
|
private UserGroupInformation userGroupInformation;
|
||||||
private boolean isTicketOld;
|
|
||||||
private boolean reloginTried;
|
private boolean reloginTried;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -57,7 +55,6 @@ public class GetHDFSSequenceFileTest {
|
||||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
|
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
|
||||||
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
|
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
|
||||||
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
|
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
|
||||||
isTicketOld = false;
|
|
||||||
reloginTried = false;
|
reloginTried = false;
|
||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
|
@ -68,7 +65,8 @@ public class GetHDFSSequenceFileTest {
|
||||||
getHDFSSequenceFile.onScheduled(context);
|
getHDFSSequenceFile.onScheduled(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getFlowFilesWithUgi() throws Exception {
|
@Test
|
||||||
|
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
|
||||||
SequenceFileReader reader = mock(SequenceFileReader.class);
|
SequenceFileReader reader = mock(SequenceFileReader.class);
|
||||||
Path file = mock(Path.class);
|
Path file = mock(Path.class);
|
||||||
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
|
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
|
||||||
|
@ -77,21 +75,9 @@ public class GetHDFSSequenceFileTest {
|
||||||
verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture());
|
verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture());
|
||||||
privilegedExceptionActionArgumentCaptor.getValue().run();
|
privilegedExceptionActionArgumentCaptor.getValue().run();
|
||||||
verify(reader).readSequenceFile(file, configuration, fileSystem);
|
verify(reader).readSequenceFile(file, configuration, fileSystem);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
|
|
||||||
getFlowFilesWithUgi();
|
|
||||||
assertFalse(reloginTried);
|
assertFalse(reloginTried);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void getFlowFilesWithUgiAndOldTicketShouldCallDoAsAndRelogin() throws Exception {
|
|
||||||
isTicketOld = true;
|
|
||||||
getFlowFilesWithUgi();
|
|
||||||
assertTrue(reloginTried);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
|
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
|
||||||
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
|
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
|
||||||
|
@ -117,15 +103,5 @@ public class GetHDFSSequenceFileTest {
|
||||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||||
return kerberosProperties;
|
return kerberosProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean isTicketOld() {
|
|
||||||
return isTicketOld;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void tryKerberosRelogin(UserGroupInformation ugi) {
|
|
||||||
reloginTried = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,8 +143,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
private static final long TICKET_RENEWAL_PERIOD = 60000;
|
|
||||||
|
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
private String connectionUrl = "unknown";
|
private String connectionUrl = "unknown";
|
||||||
|
@ -206,7 +204,26 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
* This operation makes no guarantees that the actual connection could be
|
* This operation makes no guarantees that the actual connection could be
|
||||||
* made since the underlying system may still go off-line during normal
|
* made since the underlying system may still go off-line during normal
|
||||||
* operation of the connection pool.
|
* operation of the connection pool.
|
||||||
|
* <p/>
|
||||||
|
* As of Apache NiFi 1.5.0, due to changes made to
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this class invoking
|
||||||
|
* {@link HiveConfigurator#authenticate(Configuration, String, String)}
|
||||||
|
* to authenticate a principal with Kerberos, Hive controller services no longer
|
||||||
|
* attempt relogins explicitly. For more information, please read the documentation for
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
|
||||||
|
* <p/>
|
||||||
|
* In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
|
||||||
|
* {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
|
||||||
|
* controller service was enabled. The use of a separate thread to explicitly relogin could cause race conditions
|
||||||
|
* with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
|
||||||
|
* {@link UserGroupInformation} instance. One of these threads could leave the
|
||||||
|
* {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
|
||||||
|
* while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
|
||||||
|
* authentication attempts that would leave the Hive controller service in an unrecoverable state.
|
||||||
*
|
*
|
||||||
|
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||||
|
* @see HiveConfigurator#authenticate(Configuration, String, String)
|
||||||
|
* @see HiveConfigurator#authenticate(Configuration, String, String, long)
|
||||||
* @param context the configuration context
|
* @param context the configuration context
|
||||||
* @throws InitializationException if unable to create a database connection
|
* @throws InitializationException if unable to create a database connection
|
||||||
*/
|
*/
|
||||||
|
@ -234,7 +251,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
|
|
||||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
||||||
try {
|
try {
|
||||||
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
|
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab);
|
||||||
} catch (AuthenticationFailedException ae) {
|
} catch (AuthenticationFailedException ae) {
|
||||||
log.error(ae.getMessage(), ae);
|
log.error(ae.getMessage(), ae);
|
||||||
}
|
}
|
||||||
|
@ -269,9 +286,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
*/
|
*/
|
||||||
@OnDisabled
|
@OnDisabled
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
|
||||||
hiveConfigurator.stopRenewer();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
dataSource.close();
|
dataSource.close();
|
||||||
} catch (final SQLException e) {
|
} catch (final SQLException e) {
|
||||||
|
|
|
@ -287,8 +287,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
||||||
private List<PropertyDescriptor> propertyDescriptors;
|
private List<PropertyDescriptor> propertyDescriptors;
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
|
|
||||||
private static final long TICKET_RENEWAL_PERIOD = 60000;
|
|
||||||
|
|
||||||
protected KerberosProperties kerberosProperties;
|
protected KerberosProperties kerberosProperties;
|
||||||
private volatile File kerberosConfigFile = null;
|
private volatile File kerberosConfigFile = null;
|
||||||
|
|
||||||
|
@ -374,7 +372,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
||||||
try {
|
try {
|
||||||
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
|
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab);
|
||||||
} catch (AuthenticationFailedException ae) {
|
} catch (AuthenticationFailedException ae) {
|
||||||
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
|
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
|
||||||
}
|
}
|
||||||
|
@ -865,7 +863,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
ugi = null;
|
ugi = null;
|
||||||
hiveConfigurator.stopRenewer();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupHeartBeatTimer(int heartbeatInterval) {
|
private void setupHeartBeatTimer(int heartbeatInterval) {
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.hadoop.KerberosTicketRenewer;
|
|
||||||
import org.apache.nifi.hadoop.SecurityUtil;
|
import org.apache.nifi.hadoop.SecurityUtil;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
|
||||||
|
@ -39,9 +38,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
*/
|
*/
|
||||||
public class HiveConfigurator {
|
public class HiveConfigurator {
|
||||||
|
|
||||||
private volatile KerberosTicketRenewer renewer;
|
|
||||||
|
|
||||||
|
|
||||||
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
|
||||||
|
|
||||||
final List<ValidationResult> problems = new ArrayList<>();
|
final List<ValidationResult> problems = new ArrayList<>();
|
||||||
|
@ -81,26 +77,43 @@ public class HiveConfigurator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod, ComponentLog log) throws AuthenticationFailedException {
|
/**
|
||||||
|
* As of Apache NiFi 1.5.0, due to changes made to
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||||
|
* class to authenticate a principal with Kerberos, Hive controller services no longer
|
||||||
|
* attempt relogins explicitly. For more information, please read the documentation for
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
|
||||||
|
* <p/>
|
||||||
|
* In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
|
||||||
|
* {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
|
||||||
|
* controller service was enabled. The use of a separate thread to explicitly relogin could cause race conditions
|
||||||
|
* with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
|
||||||
|
* {@link UserGroupInformation} instance. One of these threads could leave the
|
||||||
|
* {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
|
||||||
|
* while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
|
||||||
|
* authentication attempts that would leave the Hive controller service in an unrecoverable state.
|
||||||
|
*
|
||||||
|
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||||
|
*/
|
||||||
|
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException {
|
||||||
UserGroupInformation ugi;
|
UserGroupInformation ugi;
|
||||||
try {
|
try {
|
||||||
ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
|
ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
|
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we got here then we have a ugi so start a renewer
|
|
||||||
if (ugi != null) {
|
|
||||||
final String id = getClass().getSimpleName();
|
|
||||||
renewer = SecurityUtil.startTicketRenewalThread(id, ugi, ticketRenewalPeriod, log);
|
|
||||||
}
|
|
||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopRenewer() {
|
/**
|
||||||
if (renewer != null) {
|
* As of Apache NiFi 1.5.0, this method has been deprecated and is now a wrapper
|
||||||
renewer.stop();
|
* method which invokes {@link HiveConfigurator#authenticate(Configuration, String, String)}. It will no longer start a
|
||||||
}
|
* {@link org.apache.nifi.hadoop.KerberosTicketRenewer} to perform explicit relogins.
|
||||||
|
*
|
||||||
|
* @see HiveConfigurator#authenticate(Configuration, String, String)
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod) throws AuthenticationFailedException {
|
||||||
|
return authenticate(hiveConfig, principal, keyTab);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,8 +66,6 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -131,7 +129,7 @@ public class TestPutHiveStreaming {
|
||||||
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException {
|
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException {
|
||||||
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
|
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
|
||||||
ugi = mock(UserGroupInformation.class);
|
ugi = mock(UserGroupInformation.class);
|
||||||
when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString(), anyLong(), any())).thenReturn(ugi);
|
when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString())).thenReturn(ugi);
|
||||||
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
|
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
|
||||||
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
|
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
|
||||||
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
|
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
|
||||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.hadoop.KerberosTicketRenewer;
|
|
||||||
import org.apache.nifi.hadoop.SecurityUtil;
|
import org.apache.nifi.hadoop.SecurityUtil;
|
||||||
import org.apache.nifi.hbase.put.PutColumn;
|
import org.apache.nifi.hbase.put.PutColumn;
|
||||||
import org.apache.nifi.hbase.put.PutFlowFile;
|
import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
|
@ -90,11 +89,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
|
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
|
||||||
static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
|
static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
|
||||||
|
|
||||||
static final long TICKET_RENEWAL_PERIOD = 60000;
|
|
||||||
|
|
||||||
private volatile Connection connection;
|
private volatile Connection connection;
|
||||||
private volatile UserGroupInformation ugi;
|
private volatile UserGroupInformation ugi;
|
||||||
private volatile KerberosTicketRenewer renewer;
|
|
||||||
|
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
private KerberosProperties kerberosProperties;
|
private KerberosProperties kerberosProperties;
|
||||||
|
@ -190,6 +186,23 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
return problems;
|
return problems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* As of Apache NiFi 1.5.0, due to changes made to
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
|
||||||
|
* class to authenticate a principal with Kerberos, HBase controller services no longer
|
||||||
|
* attempt relogins explicitly. For more information, please read the documentation for
|
||||||
|
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
|
||||||
|
* <p/>
|
||||||
|
* In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started
|
||||||
|
* when the HBase controller service was enabled. The use of a separate thread to explicitly relogin could cause
|
||||||
|
* race conditions with the implicit relogin attempts made by hadoop/HBase code on a thread that references the same
|
||||||
|
* {@link UserGroupInformation} instance. One of these threads could leave the
|
||||||
|
* {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
|
||||||
|
* while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
|
||||||
|
* authentication attempts that would leave the HBase controller service in an unrecoverable state.
|
||||||
|
*
|
||||||
|
* @see SecurityUtil#loginKerberos(Configuration, String, String)
|
||||||
|
*/
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
||||||
this.connection = createConnection(context);
|
this.connection = createConnection(context);
|
||||||
|
@ -200,12 +213,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
if (admin != null) {
|
if (admin != null) {
|
||||||
admin.listTableNames();
|
admin.listTableNames();
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we got here then we have a successful connection, so if we have a ugi then start a renewer
|
|
||||||
if (ugi != null) {
|
|
||||||
final String id = getClass().getSimpleName();
|
|
||||||
renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,10 +276,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
|
|
||||||
@OnDisabled
|
@OnDisabled
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
if (renewer != null) {
|
|
||||||
renewer.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
try {
|
try {
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
Loading…
Reference in New Issue