mirror of https://github.com/apache/nifi.git
NIFI-866: Add Kerberos Support for Hadoop
- Add krb5.conf to nifi.properties nifi.kerberos.krb5.file | path to krb5.conf - Connections to secure Hadoop clusters will be determined by their config, that is, hadoop.security.authentication should be set to kerberos. - Added two optional arguments to AbstractHadoopProcessor (principal and keytab), these are only required if the cluster you're connecting to is secured. Both of these options require the krb5.conf to be present in nifi.properties. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
c7f7704220
commit
7fb6e884a7
|
@ -370,6 +370,9 @@ language governing permissions and limitations under the License. -->
|
||||||
<nifi.cluster.manager.flow.retrieval.delay>5 sec</nifi.cluster.manager.flow.retrieval.delay>
|
<nifi.cluster.manager.flow.retrieval.delay>5 sec</nifi.cluster.manager.flow.retrieval.delay>
|
||||||
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
|
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
|
||||||
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
|
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
|
||||||
|
|
||||||
|
<!-- nifi.properties: kerberos properties -->
|
||||||
|
<nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
|
||||||
</properties>
|
</properties>
|
||||||
<profiles>
|
<profiles>
|
||||||
<profile>
|
<profile>
|
||||||
|
|
|
@ -180,6 +180,9 @@ public class NiFiProperties extends Properties {
|
||||||
public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
|
public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
|
||||||
public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
|
public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
|
||||||
|
|
||||||
|
// kerberos properties
|
||||||
|
public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
|
||||||
|
|
||||||
// defaults
|
// defaults
|
||||||
public static final String DEFAULT_TITLE = "NiFi";
|
public static final String DEFAULT_TITLE = "NiFi";
|
||||||
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
|
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
|
||||||
|
@ -799,6 +802,14 @@ public class NiFiProperties extends Properties {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public File getKerberosConfigurationFile() {
|
||||||
|
if (getProperty(KERBEROS_KRB5_FILE).trim().length() > 0) {
|
||||||
|
return new File(getProperty(KERBEROS_KRB5_FILE));
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public InetSocketAddress getNodeApiAddress() {
|
public InetSocketAddress getNodeApiAddress() {
|
||||||
|
|
||||||
final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
|
final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
|
||||||
|
|
|
@ -162,3 +162,6 @@ nifi.cluster.manager.node.api.request.threads=${nifi.cluster.manager.node.api.re
|
||||||
nifi.cluster.manager.flow.retrieval.delay=${nifi.cluster.manager.flow.retrieval.delay}
|
nifi.cluster.manager.flow.retrieval.delay=${nifi.cluster.manager.flow.retrieval.delay}
|
||||||
nifi.cluster.manager.protocol.threads=${nifi.cluster.manager.protocol.threads}
|
nifi.cluster.manager.protocol.threads=${nifi.cluster.manager.protocol.threads}
|
||||||
nifi.cluster.manager.safemode.duration=${nifi.cluster.manager.safemode.duration}
|
nifi.cluster.manager.safemode.duration=${nifi.cluster.manager.safemode.duration}
|
||||||
|
|
||||||
|
# kerberos #
|
||||||
|
nifi.kerberos.krb5.file=${nifi.kerberos.krb5.file}
|
||||||
|
|
|
@ -61,5 +61,9 @@
|
||||||
<groupId>commons-io</groupId>
|
<groupId>commons-io</groupId>
|
||||||
<artifactId>commons-io</artifactId>
|
<artifactId>commons-io</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-properties</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -28,16 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
|
||||||
import org.apache.nifi.util.Tuple;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -50,36 +41,89 @@ import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.io.compress.Lz4Codec;
|
import org.apache.hadoop.io.compress.Lz4Codec;
|
||||||
import org.apache.hadoop.io.compress.SnappyCodec;
|
import org.apache.hadoop.io.compress.SnappyCodec;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.nifi.util.Tuple;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a base class that is helpful when building processors interacting with HDFS.
|
* This is a base class that is helpful when building processors interacting with HDFS.
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
|
private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() {
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||||
|
// Check that both the principal & keytab are set before checking the kerberos config
|
||||||
|
if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) {
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both keytab and principal must be set in order to use Kerberos authentication").build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the Kerberos configuration is set
|
||||||
|
if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) {
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(input).valid(false)
|
||||||
|
.explanation("you are missing the nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in order to use Kerberos").build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the Kerberos configuration is readable
|
||||||
|
if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) {
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(input).valid(false)
|
||||||
|
.explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + "and nifi has adequate permissions",
|
||||||
|
NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile()))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// properties
|
// properties
|
||||||
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources")
|
||||||
.name("Hadoop Configuration Resources")
|
|
||||||
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
|
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
|
||||||
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
|
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
|
||||||
.required(false)
|
.required(false).addValidator(createMultipleFilesExistValidator()).build();
|
||||||
.addValidator(createMultipleFilesExistValidator())
|
|
||||||
.build();
|
public static NiFiProperties NIFI_PROPERTIES = null;
|
||||||
|
|
||||||
public static final String DIRECTORY_PROP_NAME = "Directory";
|
public static final String DIRECTORY_PROP_NAME = "Directory";
|
||||||
|
|
||||||
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false)
|
||||||
.name("Compression codec")
|
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
|
||||||
.required(false)
|
|
||||||
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(),
|
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
|
||||||
GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName())
|
.description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
|
||||||
.build();
|
.addValidator(KERBEROS_CONFIG_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("Kerberos Keytab").required(false)
|
||||||
|
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
|
||||||
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
|
||||||
|
|
||||||
protected static final List<PropertyDescriptor> properties;
|
protected static final List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
|
private static final Object RESOURCES_LOCK = new Object();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
List<PropertyDescriptor> props = new ArrayList<>();
|
List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(HADOOP_CONFIGURATION_RESOURCES);
|
props.add(HADOOP_CONFIGURATION_RESOURCES);
|
||||||
|
props.add(KERBEROS_PRINCIPAL);
|
||||||
|
props.add(KERBEROS_KEYTAB);
|
||||||
properties = Collections.unmodifiableList(props);
|
properties = Collections.unmodifiableList(props);
|
||||||
|
try {
|
||||||
|
NIFI_PROPERTIES = NiFiProperties.getInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// This will happen during tests
|
||||||
|
NIFI_PROPERTIES = null;
|
||||||
|
}
|
||||||
|
if (NIFI_PROPERTIES != null && NIFI_PROPERTIES.getKerberosConfigurationFile() != null) {
|
||||||
|
System.setProperty("java.security.krb5.conf", NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// variables shared by all threads of this processor
|
// variables shared by all threads of this processor
|
||||||
|
@ -97,8 +141,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to
|
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
|
||||||
* call super.abstractOnScheduled(context)
|
|
||||||
*/
|
*/
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public final void abstractOnScheduled(ProcessContext context) throws IOException {
|
public final void abstractOnScheduled(ProcessContext context) throws IOException {
|
||||||
|
@ -108,11 +151,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
|
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
|
||||||
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
|
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
|
||||||
dir = dir == null ? "/" : dir;
|
dir = dir == null ? "/" : dir;
|
||||||
resources = resetHDFSResources(configResources, dir);
|
resources = resetHDFSResources(configResources, dir, context);
|
||||||
hdfsResources.set(resources);
|
hdfsResources.set(resources);
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
|
getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
|
||||||
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
|
@ -123,10 +166,38 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Configuration getConfigurationFromResources(String configResources) throws IOException {
|
||||||
|
boolean foundResources = false;
|
||||||
|
final Configuration config = new Configuration();
|
||||||
|
if (null != configResources) {
|
||||||
|
String[] resources = configResources.split(",");
|
||||||
|
for (String resource : resources) {
|
||||||
|
config.addResource(new Path(resource.trim()));
|
||||||
|
foundResources = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!foundResources) {
|
||||||
|
// check that at least 1 non-default resource is available on the classpath
|
||||||
|
String configStr = config.toString();
|
||||||
|
for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
|
||||||
|
if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
|
||||||
|
foundResources = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!foundResources) {
|
||||||
|
throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
|
||||||
|
}
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
|
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
|
||||||
*/
|
*/
|
||||||
Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, String dir) throws IOException {
|
Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
|
||||||
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
|
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
|
||||||
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
|
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
|
||||||
// NarThreadContextClassLoader.
|
// NarThreadContextClassLoader.
|
||||||
|
@ -134,30 +205,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
boolean foundResources = false;
|
Configuration config = getConfigurationFromResources(configResources);
|
||||||
final Configuration config = new Configuration();
|
|
||||||
if (null != configResources) {
|
|
||||||
String[] resources = configResources.split(",");
|
|
||||||
for (String resource : resources) {
|
|
||||||
config.addResource(new Path(resource.trim()));
|
|
||||||
foundResources = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!foundResources) {
|
|
||||||
// check that at least 1 non-default resource is available on the classpath
|
|
||||||
String configStr = config.toString();
|
|
||||||
for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
|
|
||||||
if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
|
|
||||||
foundResources = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!foundResources) {
|
|
||||||
throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
|
|
||||||
}
|
|
||||||
|
|
||||||
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
|
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
|
||||||
checkHdfsUriForTimeout(config);
|
checkHdfsUriForTimeout(config);
|
||||||
|
@ -165,13 +213,26 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
|
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
|
||||||
// restart
|
// restart
|
||||||
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
|
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
|
||||||
config.set(disableCacheName, "true");
|
|
||||||
|
|
||||||
final FileSystem fs = getFileSystem(config);
|
// If kerberos is enabled, create the file system as the kerberos principal
|
||||||
getLogger().info(
|
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
|
||||||
"Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
FileSystem fs = null;
|
||||||
new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)),
|
synchronized (RESOURCES_LOCK) {
|
||||||
fs.getDefaultReplication(new Path(dir)), config.toString()});
|
if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
|
||||||
|
String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
|
||||||
|
String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue();
|
||||||
|
UserGroupInformation.setConfiguration(config);
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
|
||||||
|
fs = getFileSystemAsUser(config, ugi);
|
||||||
|
} else {
|
||||||
|
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
|
||||||
|
config.set("hadoop.security.authentication", "simple");
|
||||||
|
fs = getFileSystem(config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
config.set(disableCacheName, "true");
|
||||||
|
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
|
||||||
|
new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
|
||||||
return new Tuple<>(config, fs);
|
return new Tuple<>(config, fs);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -180,17 +241,31 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting
|
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
|
||||||
* for UDP packets to be received
|
|
||||||
*
|
*
|
||||||
* @param config the configuration to use
|
* @param config
|
||||||
|
* the configuration to use
|
||||||
* @return the FileSystem that is created for the given Configuration
|
* @return the FileSystem that is created for the given Configuration
|
||||||
* @throws IOException if unable to create the FileSystem
|
* @throws IOException
|
||||||
|
* if unable to create the FileSystem
|
||||||
*/
|
*/
|
||||||
protected FileSystem getFileSystem(final Configuration config) throws IOException {
|
protected FileSystem getFileSystem(final Configuration config) throws IOException {
|
||||||
return FileSystem.get(config);
|
return FileSystem.get(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
|
||||||
|
try {
|
||||||
|
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
|
@Override
|
||||||
|
public FileSystem run() throws Exception {
|
||||||
|
return FileSystem.get(config);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException("Unable to create file system: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
|
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
|
||||||
*/
|
*/
|
||||||
|
@ -227,13 +302,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
final boolean valid = file.exists() && file.isFile();
|
final boolean valid = file.exists() && file.isFile();
|
||||||
if (!valid) {
|
if (!valid) {
|
||||||
final String message = "File " + file + " does not exist or is not a file";
|
final String message = "File " + file + " does not exist or is not a file";
|
||||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
|
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
} catch (SecurityException e) {
|
} catch (SecurityException e) {
|
||||||
final String message = "Unable to access " + filename + " due to " + e.getMessage();
|
final String message = "Unable to access " + filename + " due to " + e.getMessage();
|
||||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
|
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
||||||
|
@ -245,8 +318,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
/**
|
/**
|
||||||
* Returns the configured CompressionCodec, or null if none is configured.
|
* Returns the configured CompressionCodec, or null if none is configured.
|
||||||
*
|
*
|
||||||
* @param context the ProcessContext
|
* @param context
|
||||||
* @param configuration the Hadoop Configuration
|
* the ProcessContext
|
||||||
|
* @param configuration
|
||||||
|
* the Hadoop Configuration
|
||||||
* @return CompressionCodec or null
|
* @return CompressionCodec or null
|
||||||
*/
|
*/
|
||||||
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
|
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
|
||||||
|
@ -261,10 +336,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the relative path of the child that does not include the filename
|
* Returns the relative path of the child that does not include the filename or the root path.
|
||||||
* or the root path.
|
*
|
||||||
* @param root the path to relativize from
|
* @param root
|
||||||
* @param child the path to relativize
|
* the path to relativize from
|
||||||
|
* @param child
|
||||||
|
* the path to relativize
|
||||||
* @return the relative path
|
* @return the relative path
|
||||||
*/
|
*/
|
||||||
public static String getPathDifference(final Path root, final Path child) {
|
public static String getPathDifference(final Path root, final Path child) {
|
||||||
|
|
|
@ -16,24 +16,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.hadoop;
|
package org.apache.nifi.processors.hadoop;
|
||||||
|
|
||||||
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashSet;
|
|
||||||
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.util.MockProcessContext;
|
import org.apache.nifi.util.MockProcessContext;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class AbstractHadoopTest {
|
public class AbstractHadoopTest {
|
||||||
|
|
||||||
private static Logger logger;
|
private static Logger logger;
|
||||||
|
@ -82,9 +84,37 @@ public class AbstractHadoopTest {
|
||||||
TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class);
|
TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class);
|
||||||
SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor();
|
SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor();
|
||||||
try {
|
try {
|
||||||
processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target");
|
processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext());
|
||||||
Assert.fail("Should have thrown SocketTimeoutException");
|
Assert.fail("Should have thrown SocketTimeoutException");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKerberosOptions() throws Exception {
|
||||||
|
File temporaryFile = File.createTempFile("hadoop-test", ".properties");
|
||||||
|
try {
|
||||||
|
// mock properties and return a temporary file for the kerberos configuration
|
||||||
|
NiFiProperties mockedProperties = mock(NiFiProperties.class);
|
||||||
|
when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile);
|
||||||
|
SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties;
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class);
|
||||||
|
// should be valid since no kerberos options specified
|
||||||
|
runner.assertValid();
|
||||||
|
// no longer valid since only the principal is provided
|
||||||
|
runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL, "principal");
|
||||||
|
runner.assertNotValid();
|
||||||
|
// invalid since the keytab does not exist
|
||||||
|
runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, "BAD_KEYTAB_PATH");
|
||||||
|
runner.assertNotValid();
|
||||||
|
// valid since keytab is now a valid file location
|
||||||
|
runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, temporaryFile.getAbsolutePath());
|
||||||
|
runner.assertValid();
|
||||||
|
// invalid since the kerberos configuration was changed to a non-existent file
|
||||||
|
when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH"));
|
||||||
|
runner.assertNotValid();
|
||||||
|
} finally {
|
||||||
|
temporaryFile.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue