diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
index bf922fe73b..1f498de076 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
@@ -30,6 +30,7 @@ import java.security.PrivilegedExceptionAction;
* relogin attempts this thread will sleep for the provided amount of time.
*
*/
+@Deprecated
public class KerberosTicketRenewer implements Runnable {
private final UserGroupInformation ugi;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
index fcb9032853..af2275778f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
@@ -19,9 +19,9 @@ package org.apache.nifi.hadoop;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.logging.ComponentLog;
import java.io.IOException;
+import java.util.Random;
/**
* Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
@@ -35,6 +35,20 @@ public class SecurityUtil {
* Initializes UserGroupInformation with the given Configuration and performs the login for the given principal
* and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying
* UserGroupInformation.
+ *
+ * As of Apache NiFi 1.5.0, this method uses {@link UserGroupInformation#loginUserFromKeytab(String, String)} to
+ * authenticate the given principal
, which sets the static variable loginUser
in the
+ * {@link UserGroupInformation} instance. Setting loginUser
is necessary for
+ * {@link org.apache.hadoop.ipc.Client.Connection#handleSaslConnectionFailure(int, int, Exception, Random, UserGroupInformation)}
+ * to be able to attempt a relogin during a connection failure. The handleSaslConnectionFailure
method
+ * calls UserGroupInformation.getLoginUser().reloginFromKeytab()
statically, which can return null
+ * if loginUser
is not set, resulting in failure of the hadoop operation.
+ *
+ * In previous versions of NiFi, {@link UserGroupInformation#loginUserFromKeytabAndReturnUGI(String, String)} was
+ * used to authenticate the principal
, which does not set loginUser
, making it impossible
+ * for a
+ * {@link org.apache.hadoop.ipc.Client.Connection#handleSaslConnectionFailure(int, int, Exception, Random, UserGroupInformation)}
+ * to be able to implicitly relogin the principal.
*
* @param config the configuration instance
* @param principal the principal to authenticate as
@@ -51,7 +65,8 @@ public class SecurityUtil {
Validate.notNull(keyTab);
UserGroupInformation.setConfiguration(config);
- return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim());
+ UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
+ return UserGroupInformation.getCurrentUser();
}
/**
@@ -85,32 +100,4 @@ public class SecurityUtil {
Validate.notNull(config);
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
}
-
- /**
- * Start a thread that periodically attempts to renew the current Kerberos user's ticket.
- *
- * Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread.
- *
- * @param id
- * The unique identifier to use for the thread, can be the class name that started the thread
- * (i.e. PutHDFS, etc)
- * @param ugi
- * The current Kerberos user.
- * @param renewalPeriod
- * The amount of time between attempting renewals.
- * @param logger
- * The logger to use with in the renewer
- *
- * @return the KerberosTicketRenewer Runnable
- */
- public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) {
- final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger);
-
- final Thread t = new Thread(renewer);
- t.setName("Kerberos Ticket Renewal [" + id + "]");
- t.start();
-
- return renewer;
- }
-
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 89590fcd28..282a417168 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
@@ -37,7 +36,6 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import javax.net.SocketFactory;
@@ -54,11 +52,18 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* This is a base class that is helpful when building processors interacting with HDFS.
+ *
+ * As of Apache NiFi 1.5.0, the Relogin Period property is no longer used in the configuration of a Hadoop processor.
+ * Due to changes made to {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
+ * class to authenticate a principal with Kerberos, Hadoop components no longer
+ * attempt relogins explicitly. For more information, please read the documentation for
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+ *
+ * @see SecurityUtil#loginKerberos(Configuration, String, String)
*/
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
@@ -91,7 +96,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
.name("Kerberos Relogin Period").required(false)
- .description("Period of time which should pass before attempting a kerberos relogin")
+ .description("Period of time which should pass before attempting a kerberos relogin.\n\nThis property has been deprecated, and has no effect on processing. Relogins"
+ + "now occur automatically.")
.defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -111,8 +117,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
private static final Object RESOURCES_LOCK = new Object();
- private long kerberosReloginThreshold;
- private long lastKerberosReloginTime;
protected KerberosProperties kerberosProperties;
protected List properties;
private volatile File kerberosConfigFile = null;
@@ -195,10 +199,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
try {
// This value will be null when called from ListHDFS, because it overrides all of the default
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
- PropertyValue reloginPeriod = context.getProperty(KERBEROS_RELOGIN_PERIOD).evaluateAttributeExpressions();
- if (reloginPeriod.getValue() != null) {
- kerberosReloginThreshold = reloginPeriod.asTimePeriod(TimeUnit.SECONDS);
- }
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
@@ -274,7 +274,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
fs = getFileSystemAsUser(config, ugi);
- lastKerberosReloginTime = System.currentTimeMillis() / 1000;
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
@@ -403,44 +402,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
protected FileSystem getFileSystem() {
- // trigger Relogin if necessary
- getUserGroupInformation();
return hdfsResources.get().getFileSystem();
}
protected UserGroupInformation getUserGroupInformation() {
- // if kerberos is enabled, check if the ticket should be renewed before returning
- UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
- if (userGroupInformation != null && isTicketOld()) {
- tryKerberosRelogin(userGroupInformation);
- }
- return userGroupInformation;
- }
-
- protected void tryKerberosRelogin(UserGroupInformation ugi) {
- try {
- getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
- "attempting to renew ticket for user {}", new Object[]{
- kerberosReloginThreshold, ugi.getUserName()});
- ugi.doAs((PrivilegedExceptionAction) () -> {
- ugi.checkTGTAndReloginFromKeytab();
- return null;
- });
- lastKerberosReloginTime = System.currentTimeMillis() / 1000;
- getLogger().info("Kerberos relogin successful or ticket still valid");
- } catch (IOException e) {
- // Most likely case of this happening is ticket is expired and error getting a new one,
- // meaning dfs operations would fail
- getLogger().error("Kerberos relogin failed", e);
- throw new ProcessException("Unable to renew kerberos ticket", e);
- } catch (InterruptedException e) {
- getLogger().error("Interrupted while attempting Kerberos relogin", e);
- throw new ProcessException("Unable to renew kerberos ticket", e);
- }
- }
-
- protected boolean isTicketOld() {
- return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
+ return hdfsResources.get().getUserGroupInformation();
}
static protected class HdfsResources {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/bootstrap.conf b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/bootstrap.conf
index e93678021b..f1da811a20 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/bootstrap.conf
@@ -57,6 +57,10 @@ nifi.bootstrap.sensitive.key=
# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
java.arg.15=-Djava.security.egd=file:/dev/urandom
+# Requires JAAS to use only the provided JAAS configuration to authenticate a Subject, without using any "fallback" methods (such as prompting for username/password)
+# Please see https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/single-signon.html, section "EXCEPTIONS TO THE MODEL"
+java.arg.16=-Djavax.security.auth.useSubjectCredsOnly=true
+
###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 64730c84a8..1aefc75ff6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -295,6 +297,11 @@ public class GetHDFS extends AbstractHadoopProcessor {
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ context.yield();
+ getLogger().warn("Interrupted while retrieving files", e);
+ return;
}
}
@@ -342,13 +349,13 @@ public class GetHDFS extends AbstractHadoopProcessor {
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
for (final Path file : files) {
try {
- if (!hdfs.exists(file)) {
+ if (!getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.exists(file))) {
continue; // if file is no longer there then move on
}
final String originalFilename = file.getName();
final String relativePath = getPathDifference(rootDir, file);
- stream = hdfs.open(file, bufferSize);
+ stream = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.open(file, bufferSize));
final String outputFilename;
// Check if we should infer compression codec
@@ -374,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
- if (!keepSourceFiles && !hdfs.delete(file, false)) {
+ if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.delete(file, false))) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
new Object[]{file});
session.remove(flowFile);
@@ -406,7 +413,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
* @return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters
* @throws java.io.IOException ex
*/
- protected Set performListing(final ProcessContext context) throws IOException {
+ protected Set performListing(final ProcessContext context) throws IOException, InterruptedException {
final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
@@ -435,7 +442,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
* @return files to process
* @throws java.io.IOException ex
*/
- protected Set selectFiles(final FileSystem hdfs, final Path dir, Set filesVisited) throws IOException {
+ protected Set selectFiles(final FileSystem hdfs, final Path dir, Set filesVisited) throws IOException, InterruptedException {
if (null == filesVisited) {
filesVisited = new HashSet<>();
}
@@ -446,7 +453,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
final Set files = new HashSet<>();
- for (final FileStatus file : hdfs.listStatus(dir)) {
+ FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(dir));
+ for (final FileStatus file : fileStatuses) {
if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
// no need to make the files set larger than what we would queue anyway
break;
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 14d057d2e2..d33fc2e074 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -49,6 +49,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -343,6 +344,10 @@ public class ListHDFS extends AbstractHadoopProcessor {
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
return;
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ getLogger().error("Interrupted while performing listing of HDFS", e);
+ return;
}
final Set listable = determineListable(statuses, context);
@@ -381,11 +386,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
}
- private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
+ private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException {
final Set statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
- final FileStatus[] statuses = hdfs.listStatus(path, filter);
+ final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path, filter));
for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index e2bb3eacb3..00a4851059 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -144,13 +144,11 @@ public class AbstractHadoopTest {
// initialize the runner with EL for the kerberos properties
runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "${variableHadoopConfigResources}");
runner.setProperty(kerberosProperties.getKerberosPrincipal(), "${variablePrincipal}");
- runner.setProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD, "${variableReloginPeriod}");
runner.setProperty(kerberosProperties.getKerberosKeytab(), "${variableKeytab}");
// add variables for all the kerberos properties except for the keytab
runner.setVariable("variableHadoopConfigResources", "src/test/resources/core-site-security.xml");
runner.setVariable("variablePrincipal", "principal");
- runner.setVariable("variableReloginPeriod", "4m");
// test that the config is not valid, since the EL for keytab will return nothing, no keytab
runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index c5a1c021a7..69d1acdc61 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -35,7 +35,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -46,7 +45,6 @@ public class GetHDFSSequenceFileTest {
private Configuration configuration;
private FileSystem fileSystem;
private UserGroupInformation userGroupInformation;
- private boolean isTicketOld;
private boolean reloginTried;
@Before
@@ -57,7 +55,6 @@ public class GetHDFSSequenceFileTest {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
- isTicketOld = false;
reloginTried = false;
init();
}
@@ -68,7 +65,8 @@ public class GetHDFSSequenceFileTest {
getHDFSSequenceFile.onScheduled(context);
}
- private void getFlowFilesWithUgi() throws Exception {
+ @Test
+ public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
@@ -77,21 +75,9 @@ public class GetHDFSSequenceFileTest {
verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture());
privilegedExceptionActionArgumentCaptor.getValue().run();
verify(reader).readSequenceFile(file, configuration, fileSystem);
- }
-
- @Test
- public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
- getFlowFilesWithUgi();
assertFalse(reloginTried);
}
- @Test
- public void getFlowFilesWithUgiAndOldTicketShouldCallDoAsAndRelogin() throws Exception {
- isTicketOld = true;
- getFlowFilesWithUgi();
- assertTrue(reloginTried);
- }
-
@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
@@ -117,15 +103,5 @@ public class GetHDFSSequenceFileTest {
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return kerberosProperties;
}
-
- @Override
- protected boolean isTicketOld() {
- return isTicketOld;
- }
-
- @Override
- protected void tryKerberosRelogin(UserGroupInformation ugi) {
- reloginTried = true;
- }
}
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index c6941d371d..211494e224 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -143,8 +143,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
.build();
- private static final long TICKET_RENEWAL_PERIOD = 60000;
-
private List properties;
private String connectionUrl = "unknown";
@@ -206,7 +204,26 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
+ *
+ * As of Apache NiFi 1.5.0, due to changes made to
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this class invoking
+ * {@link HiveConfigurator#authenticate(Configuration, String, String)}
+ * to authenticate a principal with Kerberos, Hive controller services no longer
+ * attempt relogins explicitly. For more information, please read the documentation for
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+ *
+ * In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
+ * {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
+ * controller service was enabled. The use of a separate thread to explicitly relogin could cause race conditions
+ * with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
+ * {@link UserGroupInformation} instance. One of these threads could leave the
+ * {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
+ * while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
+ * authentication attempts that would leave the Hive controller service in an unrecoverable state.
*
+ * @see SecurityUtil#loginKerberos(Configuration, String, String)
+ * @see HiveConfigurator#authenticate(Configuration, String, String)
+ * @see HiveConfigurator#authenticate(Configuration, String, String, long)
* @param context the configuration context
* @throws InitializationException if unable to create a database connection
*/
@@ -234,7 +251,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
try {
- ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
+ ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab);
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
}
@@ -269,9 +286,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
*/
@OnDisabled
public void shutdown() {
-
- hiveConfigurator.stopRenewer();
-
try {
dataSource.close();
} catch (final SQLException e) {
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index cc241e0072..f5f8dc6d36 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -287,8 +287,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
private List propertyDescriptors;
private Set relationships;
- private static final long TICKET_RENEWAL_PERIOD = 60000;
-
protected KerberosProperties kerberosProperties;
private volatile File kerberosConfigFile = null;
@@ -374,7 +372,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
try {
- ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
+ ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab);
} catch (AuthenticationFailedException ae) {
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
}
@@ -865,7 +863,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
}
ugi = null;
- hiveConfigurator.stopRenewer();
}
private void setupHeartBeatTimer(int heartbeatInterval) {
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
index 2e663ad7b5..6d53683d75 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hadoop.KerberosTicketRenewer;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
@@ -39,9 +38,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class HiveConfigurator {
- private volatile KerberosTicketRenewer renewer;
-
-
public Collection validate(String configFiles, String principal, String keyTab, AtomicReference validationResourceHolder, ComponentLog log) {
final List problems = new ArrayList<>();
@@ -81,26 +77,43 @@ public class HiveConfigurator {
}
}
- public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod, ComponentLog log) throws AuthenticationFailedException {
-
+ /**
+ * As of Apache NiFi 1.5.0, due to changes made to
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
+ * class to authenticate a principal with Kerberos, Hive controller services no longer
+ * attempt relogins explicitly. For more information, please read the documentation for
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+ *
+ * In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started by
+ * {@link HiveConfigurator#authenticate(Configuration, String, String, long)} when the Hive
+ * controller service was enabled. The use of a separate thread to explicitly relogin could cause race conditions
+ * with the implicit relogin attempts made by hadoop/Hive code on a thread that references the same
+ * {@link UserGroupInformation} instance. One of these threads could leave the
+ * {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
+ * while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
+ * authentication attempts that would leave the Hive controller service in an unrecoverable state.
+ *
+ * @see SecurityUtil#loginKerberos(Configuration, String, String)
+ */
+ public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab) throws AuthenticationFailedException {
UserGroupInformation ugi;
try {
ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
} catch (IOException ioe) {
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
}
-
- // if we got here then we have a ugi so start a renewer
- if (ugi != null) {
- final String id = getClass().getSimpleName();
- renewer = SecurityUtil.startTicketRenewalThread(id, ugi, ticketRenewalPeriod, log);
- }
return ugi;
}
- public void stopRenewer() {
- if (renewer != null) {
- renewer.stop();
- }
+ /**
+ * As of Apache NiFi 1.5.0, this method has been deprecated and is now a wrapper
+ * method which invokes {@link HiveConfigurator#authenticate(Configuration, String, String)}. It will no longer start a
+ * {@link org.apache.nifi.hadoop.KerberosTicketRenewer} to perform explicit relogins.
+ *
+ * @see HiveConfigurator#authenticate(Configuration, String, String)
+ */
+ @Deprecated
+ public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod) throws AuthenticationFailedException {
+ return authenticate(hiveConfig, principal, keyTab);
}
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index f16cc6517b..8e10bab1cb 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -66,8 +66,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -131,7 +129,7 @@ public class TestPutHiveStreaming {
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException {
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
ugi = mock(UserGroupInformation.class);
- when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString(), anyLong(), any())).thenReturn(ugi);
+ when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString())).thenReturn(ugi);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index bc097e4b99..12309d6693 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -48,7 +48,6 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hadoop.KerberosTicketRenewer;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
@@ -90,11 +89,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
- static final long TICKET_RENEWAL_PERIOD = 60000;
-
private volatile Connection connection;
private volatile UserGroupInformation ugi;
- private volatile KerberosTicketRenewer renewer;
private List properties;
private KerberosProperties kerberosProperties;
@@ -190,6 +186,23 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return problems;
}
+ /**
+ * As of Apache NiFi 1.5.0, due to changes made to
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
+ * class to authenticate a principal with Kerberos, HBase controller services no longer
+ * attempt relogins explicitly. For more information, please read the documentation for
+ * {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
+ *
+ * In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started
+ * when the HBase controller service was enabled. The use of a separate thread to explicitly relogin could cause
+ * race conditions with the implicit relogin attempts made by hadoop/HBase code on a thread that references the same
+ * {@link UserGroupInformation} instance. One of these threads could leave the
+ * {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
+ * while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
+ * authentication attempts that would leave the HBase controller service in an unrecoverable state.
+ *
+ * @see SecurityUtil#loginKerberos(Configuration, String, String)
+ */
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.connection = createConnection(context);
@@ -200,12 +213,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
if (admin != null) {
admin.listTableNames();
}
-
- // if we got here then we have a successful connection, so if we have a ugi then start a renewer
- if (ugi != null) {
- final String id = getClass().getSimpleName();
- renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger());
- }
}
}
@@ -269,10 +276,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@OnDisabled
public void shutdown() {
- if (renewer != null) {
- renewer.stop();
- }
-
if (connection != null) {
try {
connection.close();