mirror of https://github.com/apache/nifi.git
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi
This commit is contained in:
commit
8a80060851
|
@ -25,6 +25,7 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
@ -50,9 +51,9 @@ import org.apache.nifi.components.Validator;
|
|||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
|
||||
/**
|
||||
* This is a base class that is helpful when building processors interacting with HDFS.
|
||||
|
@ -132,15 +133,24 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
|
||||
|
||||
private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
|
||||
.description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
protected static final List<PropertyDescriptor> properties;
|
||||
|
||||
private static final Object RESOURCES_LOCK = new Object();
|
||||
|
||||
private long kerberosReloginThreshold;
|
||||
private long lastKerberosReloginTime;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(HADOOP_CONFIGURATION_RESOURCES);
|
||||
props.add(KERBEROS_PRINCIPAL);
|
||||
props.add(KERBEROS_KEYTAB);
|
||||
props.add(KERBEROS_RELOGIN_PERIOD);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
try {
|
||||
NIFI_PROPERTIES = NiFiProperties.getInstance();
|
||||
|
@ -154,12 +164,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
// variables shared by all threads of this processor
|
||||
// Hadoop Configuration and FileSystem
|
||||
private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
|
||||
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
|
||||
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(ProcessorInitializationContext context) {
|
||||
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,8 +183,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
@OnScheduled
|
||||
public final void abstractOnScheduled(ProcessContext context) throws IOException {
|
||||
try {
|
||||
Tuple<Configuration, FileSystem> resources = hdfsResources.get();
|
||||
if (resources.getKey() == null || resources.getValue() == null) {
|
||||
// This value will be null when called from ListHDFS, because it overrides all of the default
|
||||
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
|
||||
if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) {
|
||||
kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
|
||||
}
|
||||
HdfsResources resources = hdfsResources.get();
|
||||
if (resources.getConfiguration() == null) {
|
||||
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
|
||||
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
|
||||
dir = dir == null ? "/" : dir;
|
||||
|
@ -183,14 +198,14 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
} catch (IOException ex) {
|
||||
getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
|
||||
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public final void abstractOnStopped() {
|
||||
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
}
|
||||
|
||||
private static Configuration getConfigurationFromResources(String configResources) throws IOException {
|
||||
|
@ -224,7 +239,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
/*
|
||||
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
|
||||
*/
|
||||
Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
|
||||
HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
|
||||
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
|
||||
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
|
||||
// NarThreadContextClassLoader.
|
||||
|
@ -244,13 +259,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
// If kerberos is enabled, create the file system as the kerberos principal
|
||||
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
|
||||
FileSystem fs = null;
|
||||
UserGroupInformation ugi = null;
|
||||
synchronized (RESOURCES_LOCK) {
|
||||
if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
|
||||
String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
|
||||
String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue();
|
||||
UserGroupInformation.setConfiguration(config);
|
||||
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
|
||||
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
|
||||
fs = getFileSystemAsUser(config, ugi);
|
||||
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
|
||||
} else {
|
||||
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
|
||||
config.set("hadoop.security.authentication", "simple");
|
||||
|
@ -260,7 +277,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
config.set(disableCacheName, "true");
|
||||
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
||||
new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
|
||||
return new Tuple<>(config, fs);
|
||||
return new HdfsResources(config, fs, ugi);
|
||||
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(savedClassLoader);
|
||||
|
@ -392,10 +409,59 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
protected Configuration getConfiguration() {
|
||||
return hdfsResources.get().getKey();
|
||||
return hdfsResources.get().getConfiguration();
|
||||
}
|
||||
|
||||
protected FileSystem getFileSystem() {
|
||||
return hdfsResources.get().getValue();
|
||||
// if kerberos is enabled, check if the ticket should be renewed before returning the FS
|
||||
if (hdfsResources.get().getUserGroupInformation() != null && isTicketOld()) {
|
||||
tryKerberosRelogin(hdfsResources.get().getUserGroupInformation());
|
||||
}
|
||||
return hdfsResources.get().getFileSystem();
|
||||
}
|
||||
|
||||
protected void tryKerberosRelogin(UserGroupInformation ugi) {
|
||||
try {
|
||||
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
|
||||
"attempting to renew ticket for user {}", new Object[]{
|
||||
kerberosReloginThreshold, ugi.getUserName()});
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
|
||||
getLogger().info("Kerberos relogin successful or ticket still valid");
|
||||
} catch (IOException e) {
|
||||
// Most likely case of this happening is ticket is expired and error getting a new one,
|
||||
// meaning dfs operations would fail
|
||||
getLogger().error("Kerberos relogin failed", e);
|
||||
throw new ProcessException("Unable to renew kerberos ticket", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isTicketOld() {
|
||||
return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
|
||||
}
|
||||
|
||||
|
||||
static protected class HdfsResources {
|
||||
private final Configuration configuration;
|
||||
private final FileSystem fileSystem;
|
||||
private final UserGroupInformation userGroupInformation;
|
||||
|
||||
public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
|
||||
this.configuration = configuration;
|
||||
this.fileSystem = fileSystem;
|
||||
this.userGroupInformation = userGroupInformation;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
|
||||
public FileSystem getFileSystem() {
|
||||
return fileSystem;
|
||||
}
|
||||
|
||||
public UserGroupInformation getUserGroupInformation() {
|
||||
return userGroupInformation;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue