diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 0102b1f915..a67a9fd0fe 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -25,6 +25,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; @@ -50,9 +51,9 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.Tuple; /** * This is a base class that is helpful when building processors interacting with HDFS. @@ -132,15 +133,24 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); + private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) + .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final List properties; private static final Object RESOURCES_LOCK = new Object(); + private long kerberosReloginThreshold; + private long lastKerberosReloginTime; + static { List props = new ArrayList<>(); props.add(HADOOP_CONFIGURATION_RESOURCES); props.add(KERBEROS_PRINCIPAL); props.add(KERBEROS_KEYTAB); + props.add(KERBEROS_RELOGIN_PERIOD); properties = Collections.unmodifiableList(props); try { NIFI_PROPERTIES = NiFiProperties.getInstance(); @@ -154,12 +164,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } // variables shared by all threads of this processor - // Hadoop Configuration and FileSystem - private final AtomicReference> hdfsResources = new AtomicReference<>(); + // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) + private final AtomicReference hdfsResources = new AtomicReference<>(); @Override protected void init(ProcessorInitializationContext context) { - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new HdfsResources(null, null, null)); } @Override @@ -173,8 +183,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { - Tuple resources = hdfsResources.get(); - if (resources.getKey() == null || resources.getValue() == null) { + // This value will be null when called from ListHDFS, because it overrides all of the default + // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos + if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) { + kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS); + } + HdfsResources resources = hdfsResources.get(); + if (resources.getConfiguration() == null) { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); dir = dir == null ? "/" : dir; @@ -183,14 +198,14 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } catch (IOException ex) { getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new HdfsResources(null, null, null)); throw ex; } } @OnStopped public final void abstractOnStopped() { - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new HdfsResources(null, null, null)); } private static Configuration getConfigurationFromResources(String configResources) throws IOException { @@ -224,7 +239,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - Tuple resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // NarThreadContextClassLoader. @@ -244,13 +259,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // If kerberos is enabled, create the file system as the kerberos principal // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time FileSystem fs = null; + UserGroupInformation ugi = null; synchronized (RESOURCES_LOCK) { if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) { String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue(); String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue(); UserGroupInformation.setConfiguration(config); - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); fs = getFileSystemAsUser(config, ugi); + lastKerberosReloginTime = System.currentTimeMillis() / 1000; } else { config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); config.set("hadoop.security.authentication", "simple"); @@ -260,7 +277,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { config.set(disableCacheName, "true"); getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); - return new Tuple<>(config, fs); + return new HdfsResources(config, fs, ugi); } finally { Thread.currentThread().setContextClassLoader(savedClassLoader); @@ -392,10 +409,59 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } protected Configuration getConfiguration() { - return hdfsResources.get().getKey(); + return hdfsResources.get().getConfiguration(); } protected FileSystem getFileSystem() { - return hdfsResources.get().getValue(); + // if kerberos is enabled, check if the ticket should be renewed before returning the FS + if (hdfsResources.get().getUserGroupInformation() != null && isTicketOld()) { + tryKerberosRelogin(hdfsResources.get().getUserGroupInformation()); + } + return hdfsResources.get().getFileSystem(); + } + + protected void tryKerberosRelogin(UserGroupInformation ugi) { + try { + getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " + + "attempting to renew ticket for user {}", new Object[]{ + kerberosReloginThreshold, ugi.getUserName()}); + ugi.checkTGTAndReloginFromKeytab(); + lastKerberosReloginTime = System.currentTimeMillis() / 1000; + getLogger().info("Kerberos relogin successful or ticket still valid"); + } catch (IOException e) { + // Most likely case of this happening is ticket is expired and error getting a new one, + // meaning dfs operations would fail + getLogger().error("Kerberos relogin failed", e); + throw new ProcessException("Unable to renew kerberos ticket", e); + } + } + + protected boolean isTicketOld() { + return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold; + } + + + static protected class HdfsResources { + private final Configuration configuration; + private final FileSystem fileSystem; + private final UserGroupInformation userGroupInformation; + + public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) { + this.configuration = configuration; + this.fileSystem = fileSystem; + this.userGroupInformation = userGroupInformation; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } } }