diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 0431a769d9..3b42802b50 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -370,6 +370,9 @@ language governing permissions and limitations under the License. -->
5 sec
10
0 sec
+
+
+
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 61199c44d6..5ee6c13dc4 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -180,6 +180,9 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
+ // kerberos properties
+ public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
+
// defaults
public static final String DEFAULT_TITLE = "NiFi";
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
@@ -799,6 +802,14 @@ public class NiFiProperties extends Properties {
}
}
+ public File getKerberosConfigurationFile() {
+ if (getProperty(KERBEROS_KRB5_FILE).trim().length() > 0) {
+ return new File(getProperty(KERBEROS_KRB5_FILE));
+ } else {
+ return null;
+ }
+ }
+
public InetSocketAddress getNodeApiAddress() {
final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 6eae46b047..54b5283d43 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -162,3 +162,6 @@ nifi.cluster.manager.node.api.request.threads=${nifi.cluster.manager.node.api.re
nifi.cluster.manager.flow.retrieval.delay=${nifi.cluster.manager.flow.retrieval.delay}
nifi.cluster.manager.protocol.threads=${nifi.cluster.manager.protocol.threads}
nifi.cluster.manager.safemode.duration=${nifi.cluster.manager.safemode.duration}
+
+# kerberos #
+nifi.kerberos.krb5.file=${nifi.kerberos.krb5.file}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 6bc110f9d0..04a153492c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -61,5 +61,9 @@
commons-io
commons-io
+
+ org.apache.nifi
+ nifi-properties
+
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 548d34cdb3..8519d2c8c6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -28,16 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.util.Tuple;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -50,36 +41,89 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
/**
* This is a base class that is helpful when building processors interacting with HDFS.
*/
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+ private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ // Check that both the principal & keytab are set before checking the kerberos config
+ if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both keytab and principal must be set in order to use Kerberos authentication").build();
+ }
+
+ // Check that the Kerberos configuration is set
+ if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+ .explanation("you are missing the nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in order to use Kerberos").build();
+ }
+
+ // Check that the Kerberos configuration is readable
+ if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+ .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + "and nifi has adequate permissions",
+ NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile()))
+ .build();
+ }
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+ };
// properties
- public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
- .name("Hadoop Configuration Resources")
+ public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources")
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
- .required(false)
- .addValidator(createMultipleFilesExistValidator())
- .build();
+ .required(false).addValidator(createMultipleFilesExistValidator()).build();
+
+ public static NiFiProperties NIFI_PROPERTIES = null;
public static final String DIRECTORY_PROP_NAME = "Directory";
- public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
- .name("Compression codec")
- .required(false)
- .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(),
- GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName())
- .build();
+ public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false)
+ .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
+
+ public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
+ .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
+ .addValidator(KERBEROS_CONFIG_VALIDATOR).build();
+
+ public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("Kerberos Keytab").required(false)
+ .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
protected static final List properties;
+ private static final Object RESOURCES_LOCK = new Object();
+
static {
List props = new ArrayList<>();
props.add(HADOOP_CONFIGURATION_RESOURCES);
+ props.add(KERBEROS_PRINCIPAL);
+ props.add(KERBEROS_KEYTAB);
properties = Collections.unmodifiableList(props);
+ try {
+ NIFI_PROPERTIES = NiFiProperties.getInstance();
+ } catch (Exception e) {
+ // This will happen during tests
+ NIFI_PROPERTIES = null;
+ }
+ if (NIFI_PROPERTIES != null && NIFI_PROPERTIES.getKerberosConfigurationFile() != null) {
+ System.setProperty("java.security.krb5.conf", NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath());
+ }
}
// variables shared by all threads of this processor
@@ -97,8 +141,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
/*
- * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to
- * call super.abstractOnScheduled(context)
+ * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
*/
@OnScheduled
public final void abstractOnScheduled(ProcessContext context) throws IOException {
@@ -108,11 +151,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
dir = dir == null ? "/" : dir;
- resources = resetHDFSResources(configResources, dir);
+ resources = resetHDFSResources(configResources, dir, context);
hdfsResources.set(resources);
}
} catch (IOException ex) {
- getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
+ getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
hdfsResources.set(new Tuple(null, null));
throw ex;
}
@@ -123,10 +166,38 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
hdfsResources.set(new Tuple(null, null));
}
+ private static Configuration getConfigurationFromResources(String configResources) throws IOException {
+ boolean foundResources = false;
+ final Configuration config = new Configuration();
+ if (null != configResources) {
+ String[] resources = configResources.split(",");
+ for (String resource : resources) {
+ config.addResource(new Path(resource.trim()));
+ foundResources = true;
+ }
+ }
+
+ if (!foundResources) {
+ // check that at least 1 non-default resource is available on the classpath
+ String configStr = config.toString();
+ for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
+ if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
+ foundResources = true;
+ break;
+ }
+ }
+ }
+
+ if (!foundResources) {
+ throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
+ }
+ return config;
+ }
+
/*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
- Tuple resetHDFSResources(String configResources, String dir) throws IOException {
+ Tuple resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
// NarThreadContextClassLoader.
@@ -134,30 +205,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
- boolean foundResources = false;
- final Configuration config = new Configuration();
- if (null != configResources) {
- String[] resources = configResources.split(",");
- for (String resource : resources) {
- config.addResource(new Path(resource.trim()));
- foundResources = true;
- }
- }
-
- if (!foundResources) {
- // check that at least 1 non-default resource is available on the classpath
- String configStr = config.toString();
- for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
- if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
- foundResources = true;
- break;
- }
- }
- }
-
- if (!foundResources) {
- throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
- }
+ Configuration config = getConfigurationFromResources(configResources);
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
@@ -165,13 +213,26 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
// restart
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
- config.set(disableCacheName, "true");
- final FileSystem fs = getFileSystem(config);
- getLogger().info(
- "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
- new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)),
- fs.getDefaultReplication(new Path(dir)), config.toString()});
+ // If kerberos is enabled, create the file system as the kerberos principal
+ // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
+ FileSystem fs = null;
+ synchronized (RESOURCES_LOCK) {
+ if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
+ String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
+ String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue();
+ UserGroupInformation.setConfiguration(config);
+ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
+ fs = getFileSystemAsUser(config, ugi);
+ } else {
+ config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
+ config.set("hadoop.security.authentication", "simple");
+ fs = getFileSystem(config);
+ }
+ }
+ config.set(disableCacheName, "true");
+ getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
+ new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
return new Tuple<>(config, fs);
} finally {
@@ -180,17 +241,31 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
/**
- * This exists in order to allow unit tests to override it so that they don't take several minutes waiting
- * for UDP packets to be received
+ * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
*
- * @param config the configuration to use
+ * @param config
+ * the configuration to use
* @return the FileSystem that is created for the given Configuration
- * @throws IOException if unable to create the FileSystem
+ * @throws IOException
+ * if unable to create the FileSystem
*/
protected FileSystem getFileSystem(final Configuration config) throws IOException {
return FileSystem.get(config);
}
+ protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public FileSystem run() throws Exception {
+ return FileSystem.get(config);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException("Unable to create file system: " + e.getMessage());
+ }
+ }
+
/*
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
*/
@@ -227,13 +302,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not exist or is not a file";
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
- .build();
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename + " due to " + e.getMessage();
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
- .build();
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
@@ -245,8 +318,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
- * @param context the ProcessContext
- * @param configuration the Hadoop Configuration
+ * @param context
+ * the ProcessContext
+ * @param configuration
+ * the Hadoop Configuration
* @return CompressionCodec or null
*/
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
@@ -261,10 +336,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
/**
- * Returns the relative path of the child that does not include the filename
- * or the root path.
- * @param root the path to relativize from
- * @param child the path to relativize
+ * Returns the relative path of the child that does not include the filename or the root path.
+ *
+ * @param root
+ * the path to relativize from
+ * @param child
+ * the path to relativize
* @return the relative path
*/
public static String getPathDifference(final Path root, final Path child) {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index e1b782716f..c47c7a07d7 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -16,24 +16,26 @@
*/
package org.apache.nifi.processors.hadoop;
-import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class AbstractHadoopTest {
private static Logger logger;
@@ -82,9 +84,37 @@ public class AbstractHadoopTest {
TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class);
SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor();
try {
- processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target");
+ processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext());
Assert.fail("Should have thrown SocketTimeoutException");
} catch (IOException e) {
}
}
+
+ @Test
+ public void testKerberosOptions() throws Exception {
+ File temporaryFile = File.createTempFile("hadoop-test", ".properties");
+ try {
+ // mock properties and return a temporary file for the kerberos configuration
+ NiFiProperties mockedProperties = mock(NiFiProperties.class);
+ when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile);
+ SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties;
+ TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class);
+ // should be valid since no kerberos options specified
+ runner.assertValid();
+ // no longer valid since only the principal is provided
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL, "principal");
+ runner.assertNotValid();
+ // invalid since the keytab does not exist
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, "BAD_KEYTAB_PATH");
+ runner.assertNotValid();
+ // valid since keytab is now a valid file location
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, temporaryFile.getAbsolutePath());
+ runner.assertValid();
+ // invalid since the kerberos configuration was changed to a non-existent file
+ when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH"));
+ runner.assertNotValid();
+ } finally {
+ temporaryFile.delete();
+ }
+ }
}