From 7fb6e884a72acfd32c97b96058909c0cf6434061 Mon Sep 17 00:00:00 2001 From: ricky Date: Thu, 13 Aug 2015 14:54:15 -0400 Subject: [PATCH] NIFI-866: Add Kerberos Support for Hadoop - Add krb5.conf to nifi.properties nifi.kerberos.krb5.file | path to krb5.conf - Connections to secure Hadoop clusters will be determined by their config, that is, hadoop.security.authentication should be set to kerberos. - Added two optional arguments to AbstractHadoopProcessor (principal and keytab), these are only required if the cluster you're connecting to is secured. Both of these options require the krb5.conf to be present in nifi.properties. Signed-off-by: Bryan Bende --- nifi-assembly/pom.xml | 3 + .../org/apache/nifi/util/NiFiProperties.java | 11 + .../src/main/resources/conf/nifi.properties | 3 + .../nifi-hdfs-processors/pom.xml | 4 + .../hadoop/AbstractHadoopProcessor.java | 217 ++++++++++++------ .../processors/hadoop/AbstractHadoopTest.java | 46 +++- 6 files changed, 206 insertions(+), 78 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 0431a769d9..3b42802b50 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -370,6 +370,9 @@ language governing permissions and limitations under the License. --> 5 sec 10 0 sec + + + diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 61199c44d6..5ee6c13dc4 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -180,6 +180,9 @@ public class NiFiProperties extends Properties { public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads"; public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration"; + // kerberos properties + public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; + // defaults public static final String DEFAULT_TITLE = "NiFi"; public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; @@ -799,6 +802,14 @@ public class NiFiProperties extends Properties { } } + public File getKerberosConfigurationFile() { + if (getProperty(KERBEROS_KRB5_FILE).trim().length() > 0) { + return new File(getProperty(KERBEROS_KRB5_FILE)); + } else { + return null; + } + } + public InetSocketAddress getNodeApiAddress() { final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 6eae46b047..54b5283d43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -162,3 +162,6 @@ nifi.cluster.manager.node.api.request.threads=${nifi.cluster.manager.node.api.re nifi.cluster.manager.flow.retrieval.delay=${nifi.cluster.manager.flow.retrieval.delay} nifi.cluster.manager.protocol.threads=${nifi.cluster.manager.protocol.threads} nifi.cluster.manager.safemode.duration=${nifi.cluster.manager.safemode.duration} + +# kerberos # +nifi.kerberos.krb5.file=${nifi.kerberos.krb5.file} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 6bc110f9d0..04a153492c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -61,5 +61,9 @@ commons-io commons-io + + org.apache.nifi + nifi-properties + diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 548d34cdb3..8519d2c8c6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -28,16 +29,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.util.Tuple; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -50,36 +41,89 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; /** * This is a base class that is helpful when building processors interacting with HDFS. */ public abstract class AbstractHadoopProcessor extends AbstractProcessor { + private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Check that both the principal & keytab are set before checking the kerberos config + if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both keytab and principal must be set in order to use Kerberos authentication").build(); + } + + // Check that the Kerberos configuration is set + if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation("you are missing the nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in order to use Kerberos").build(); + } + + // Check that the Kerberos configuration is readable + if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + "and nifi has adequate permissions", + NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile())) + .build(); + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; // properties - public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() - .name("Hadoop Configuration Resources") + public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources") .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") - .required(false) - .addValidator(createMultipleFilesExistValidator()) - .build(); + .required(false).addValidator(createMultipleFilesExistValidator()).build(); + + public static NiFiProperties NIFI_PROPERTIES = null; public static final String DIRECTORY_PROP_NAME = "Directory"; - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression codec") - .required(false) - .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), - GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()) - .build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false) + .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build(); + + public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false) + .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) + .addValidator(KERBEROS_CONFIG_VALIDATOR).build(); + + public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("Kerberos Keytab").required(false) + .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); protected static final List properties; + private static final Object RESOURCES_LOCK = new Object(); + static { List props = new ArrayList<>(); props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(KERBEROS_PRINCIPAL); + props.add(KERBEROS_KEYTAB); properties = Collections.unmodifiableList(props); + try { + NIFI_PROPERTIES = NiFiProperties.getInstance(); + } catch (Exception e) { + // This will happen during tests + NIFI_PROPERTIES = null; + } + if (NIFI_PROPERTIES != null && NIFI_PROPERTIES.getKerberosConfigurationFile() != null) { + System.setProperty("java.security.krb5.conf", NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath()); + } } // variables shared by all threads of this processor @@ -97,8 +141,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } /* - * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to - * call super.abstractOnScheduled(context) + * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) */ @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { @@ -108,11 +151,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); dir = dir == null ? "/" : dir; - resources = resetHDFSResources(configResources, dir); + resources = resetHDFSResources(configResources, dir, context); hdfsResources.set(resources); } } catch (IOException ex) { - getLogger().error("HDFS Configuration error - {}", new Object[]{ex}); + getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); hdfsResources.set(new Tuple(null, null)); throw ex; } @@ -123,10 +166,38 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { hdfsResources.set(new Tuple(null, null)); } + private static Configuration getConfigurationFromResources(String configResources) throws IOException { + boolean foundResources = false; + final Configuration config = new Configuration(); + if (null != configResources) { + String[] resources = configResources.split(","); + for (String resource : resources) { + config.addResource(new Path(resource.trim())); + foundResources = true; + } + } + + if (!foundResources) { + // check that at least 1 non-default resource is available on the classpath + String configStr = config.toString(); + for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { + if (!resource.contains("default") && config.getResource(resource.trim()) != null) { + foundResources = true; + break; + } + } + } + + if (!foundResources) { + throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath"); + } + return config; + } + /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - Tuple resetHDFSResources(String configResources, String dir) throws IOException { + Tuple resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // NarThreadContextClassLoader. @@ -134,30 +205,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); try { - boolean foundResources = false; - final Configuration config = new Configuration(); - if (null != configResources) { - String[] resources = configResources.split(","); - for (String resource : resources) { - config.addResource(new Path(resource.trim())); - foundResources = true; - } - } - - if (!foundResources) { - // check that at least 1 non-default resource is available on the classpath - String configStr = config.toString(); - for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { - if (!resource.contains("default") && config.getResource(resource.trim()) != null) { - foundResources = true; - break; - } - } - } - - if (!foundResources) { - throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath"); - } + Configuration config = getConfigurationFromResources(configResources); // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout checkHdfsUriForTimeout(config); @@ -165,13 +213,26 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete // restart String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); - config.set(disableCacheName, "true"); - final FileSystem fs = getFileSystem(config); - getLogger().info( - "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", - new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), - fs.getDefaultReplication(new Path(dir)), config.toString()}); + // If kerberos is enabled, create the file system as the kerberos principal + // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time + FileSystem fs = null; + synchronized (RESOURCES_LOCK) { + if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) { + String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue(); + String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue(); + UserGroupInformation.setConfiguration(config); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); + fs = getFileSystemAsUser(config, ugi); + } else { + config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + config.set("hadoop.security.authentication", "simple"); + fs = getFileSystem(config); + } + } + config.set(disableCacheName, "true"); + getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", + new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); return new Tuple<>(config, fs); } finally { @@ -180,17 +241,31 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } /** - * This exists in order to allow unit tests to override it so that they don't take several minutes waiting - * for UDP packets to be received + * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received * - * @param config the configuration to use + * @param config + * the configuration to use * @return the FileSystem that is created for the given Configuration - * @throws IOException if unable to create the FileSystem + * @throws IOException + * if unable to create the FileSystem */ protected FileSystem getFileSystem(final Configuration config) throws IOException { return FileSystem.get(config); } + protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(config); + } + }); + } catch (InterruptedException e) { + throw new IOException("Unable to create file system: " + e.getMessage()); + } + } + /* * Drastically reduce the timeout of a socket connection from the default in FileSystem.get() */ @@ -227,13 +302,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { final boolean valid = file.exists() && file.isFile(); if (!valid) { final String message = "File " + file + " does not exist or is not a file"; - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message) - .build(); + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); } } catch (SecurityException e) { final String message = "Unable to access " + filename + " due to " + e.getMessage(); - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message) - .build(); + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); } } return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); @@ -245,8 +318,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { /** * Returns the configured CompressionCodec, or null if none is configured. * - * @param context the ProcessContext - * @param configuration the Hadoop Configuration + * @param context + * the ProcessContext + * @param configuration + * the Hadoop Configuration * @return CompressionCodec or null */ protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { @@ -261,10 +336,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } /** - * Returns the relative path of the child that does not include the filename - * or the root path. - * @param root the path to relativize from - * @param child the path to relativize + * Returns the relative path of the child that does not include the filename or the root path. + * + * @param root + * the path to relativize from + * @param child + * the path to relativize * @return the relative path */ public static String getPathDifference(final Path root, final Path child) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java index e1b782716f..c47c7a07d7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java @@ -16,24 +16,26 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; - import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class AbstractHadoopTest { private static Logger logger; @@ -82,9 +84,37 @@ public class AbstractHadoopTest { TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor(); try { - processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target"); + processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext()); Assert.fail("Should have thrown SocketTimeoutException"); } catch (IOException e) { } } + + @Test + public void testKerberosOptions() throws Exception { + File temporaryFile = File.createTempFile("hadoop-test", ".properties"); + try { + // mock properties and return a temporary file for the kerberos configuration + NiFiProperties mockedProperties = mock(NiFiProperties.class); + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile); + SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties; + TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); + // should be valid since no kerberos options specified + runner.assertValid(); + // no longer valid since only the principal is provided + runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL, "principal"); + runner.assertNotValid(); + // invalid since the keytab does not exist + runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, "BAD_KEYTAB_PATH"); + runner.assertNotValid(); + // valid since keytab is now a valid file location + runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, temporaryFile.getAbsolutePath()); + runner.assertValid(); + // invalid since the kerberos configuration was changed to a non-existent file + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH")); + runner.assertNotValid(); + } finally { + temporaryFile.delete(); + } + } }