HDFS-9780. RollingFileSystemSink doesn't work on secure clusters. (Daniel Templeton via kasha)
(cherry picked from commit 33ce7f6c07
)
This commit is contained in:
parent
d3a0bb6d32
commit
383f3ff28c
|
@ -47,6 +47,8 @@ import org.apache.hadoop.metrics2.MetricsException;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics2.MetricsSink;
|
import org.apache.hadoop.metrics2.MetricsSink;
|
||||||
import org.apache.hadoop.metrics2.MetricsTag;
|
import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>This class is a metrics sink that uses
|
* <p>This class is a metrics sink that uses
|
||||||
|
@ -107,6 +109,14 @@ import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
* the data is being written successfully. This is a known HDFS limitation that
|
* the data is being written successfully. This is a known HDFS limitation that
|
||||||
* exists because of the performance cost of updating the metadata. See
|
* exists because of the performance cost of updating the metadata. See
|
||||||
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
|
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
|
||||||
|
*
|
||||||
|
* <p>When using this sink in a secure (Kerberos) environment, two additional
|
||||||
|
* properties must be set: <code>keytab-key</code> and
|
||||||
|
* <code>principal-key</code>. <code>keytab-key</code> should contain the key by
|
||||||
|
* which the keytab file can be found in the configuration, for example,
|
||||||
|
* <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should
|
||||||
|
* contain the key by which the principal can be found in the configuration,
|
||||||
|
* for example, <code>yarn.nodemanager.principal</code>.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@ -115,6 +125,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
private static final String SOURCE_KEY = "source";
|
private static final String SOURCE_KEY = "source";
|
||||||
private static final String IGNORE_ERROR_KEY = "ignore-error";
|
private static final String IGNORE_ERROR_KEY = "ignore-error";
|
||||||
private static final String ALLOW_APPEND_KEY = "allow-append";
|
private static final String ALLOW_APPEND_KEY = "allow-append";
|
||||||
|
private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
|
||||||
|
private static final String USERNAME_PROPERTY_KEY = "principal-key";
|
||||||
private static final String SOURCE_DEFAULT = "unknown";
|
private static final String SOURCE_DEFAULT = "unknown";
|
||||||
private static final String BASEPATH_DEFAULT = "/tmp";
|
private static final String BASEPATH_DEFAULT = "/tmp";
|
||||||
private static final FastDateFormat DATE_FORMAT =
|
private static final FastDateFormat DATE_FORMAT =
|
||||||
|
@ -134,10 +146,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
// We keep this only to be able to call hsynch() on it.
|
// We keep this only to be able to call hsynch() on it.
|
||||||
private FSDataOutputStream currentFSOutStream;
|
private FSDataOutputStream currentFSOutStream;
|
||||||
private Timer flushTimer;
|
private Timer flushTimer;
|
||||||
|
|
||||||
|
// This flag is used during testing to make the flusher thread run after only
|
||||||
|
// a short pause instead of waiting for the top of the hour.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected static boolean isTest = false;
|
protected static boolean flushQuickly = false;
|
||||||
|
// This flag is used by the flusher thread to indicate that it has run. Used
|
||||||
|
// only for testing purposes.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected static volatile boolean hasFlushed = false;
|
protected static volatile boolean hasFlushed = false;
|
||||||
|
// Use this configuration instead of loading a new one.
|
||||||
|
@VisibleForTesting
|
||||||
|
protected static Configuration suppliedConf = null;
|
||||||
|
// Use this file system instead of getting a new one.
|
||||||
|
@VisibleForTesting
|
||||||
|
protected static FileSystem suppliedFilesystem = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(SubsetConfiguration conf) {
|
public void init(SubsetConfiguration conf) {
|
||||||
|
@ -146,15 +169,49 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
||||||
allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
||||||
|
|
||||||
|
Configuration configuration = loadConf();
|
||||||
|
|
||||||
|
UserGroupInformation.setConfiguration(configuration);
|
||||||
|
|
||||||
|
// Don't do secure setup if it's not needed.
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
// Validate config so that we don't get an NPE
|
||||||
|
checkForProperty(conf, KEYTAB_PROPERTY_KEY);
|
||||||
|
checkForProperty(conf, USERNAME_PROPERTY_KEY);
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fileSystem = FileSystem.get(new URI(basePath.toString()),
|
// Login as whoever we're supposed to be and let the hostname be pulled
|
||||||
new Configuration());
|
// from localhost. If security isn't enabled, this does nothing.
|
||||||
} catch (URISyntaxException ex) {
|
SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
|
||||||
throw new MetricsException("The supplied filesystem base path URI"
|
conf.getString(USERNAME_PROPERTY_KEY));
|
||||||
+ " is not a valid URI: " + basePath.toString(), ex);
|
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new MetricsException("Error connecting to file system: "
|
throw new MetricsException("Error logging in securely: ["
|
||||||
+ basePath + " [" + ex.toString() + "]", ex);
|
+ ex.toString() + "]", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fileSystem = getFileSystem(configuration);
|
||||||
|
|
||||||
|
// This step isn't strictly necessary, but it makes debugging issues much
|
||||||
|
// easier. We try to create the base directory eagerly and fail with
|
||||||
|
// copious debug info if it fails.
|
||||||
|
try {
|
||||||
|
fileSystem.mkdirs(basePath);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new MetricsException("Failed to create " + basePath + "["
|
||||||
|
+ SOURCE_KEY + "=" + source + ", "
|
||||||
|
+ IGNORE_ERROR_KEY + "=" + ignoreError + ", "
|
||||||
|
+ ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
||||||
|
+ KEYTAB_PROPERTY_KEY + "="
|
||||||
|
+ conf.getString(KEYTAB_PROPERTY_KEY) + ", "
|
||||||
|
+ conf.getString(KEYTAB_PROPERTY_KEY) + "="
|
||||||
|
+ configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
|
||||||
|
+ USERNAME_PROPERTY_KEY + "="
|
||||||
|
+ conf.getString(USERNAME_PROPERTY_KEY) + ", "
|
||||||
|
+ conf.getString(USERNAME_PROPERTY_KEY) + "="
|
||||||
|
+ configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
|
||||||
|
+ "] -- " + ex.toString(), ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we're permitted to append, check if we actually can
|
// If we're permitted to append, check if we actually can
|
||||||
|
@ -165,6 +222,67 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Throw a {@link MetricsException} if the given property is not set.
|
||||||
|
*
|
||||||
|
* @param conf the configuration to test
|
||||||
|
* @param key the key to validate
|
||||||
|
*/
|
||||||
|
private static void checkForProperty(SubsetConfiguration conf, String key) {
|
||||||
|
if (!conf.containsKey(key)) {
|
||||||
|
throw new MetricsException("Configuration is missing " + key
|
||||||
|
+ " property");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the supplied configuration for testing or otherwise load a new
|
||||||
|
* configuration.
|
||||||
|
*
|
||||||
|
* @return the configuration to use
|
||||||
|
*/
|
||||||
|
private Configuration loadConf() {
|
||||||
|
Configuration conf;
|
||||||
|
|
||||||
|
if (suppliedConf != null) {
|
||||||
|
conf = suppliedConf;
|
||||||
|
} else {
|
||||||
|
// The config we're handed in init() isn't the one we want here, so we
|
||||||
|
// create a new one to pick up the full settings.
|
||||||
|
conf = new Configuration();
|
||||||
|
}
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the supplied file system for testing or otherwise get a new file
|
||||||
|
* system.
|
||||||
|
*
|
||||||
|
* @param conf the configuration
|
||||||
|
* @return the file system to use
|
||||||
|
* @throws MetricsException thrown if the file system could not be retrieved
|
||||||
|
*/
|
||||||
|
private FileSystem getFileSystem(Configuration conf) throws MetricsException {
|
||||||
|
FileSystem fs = null;
|
||||||
|
|
||||||
|
if (suppliedFilesystem != null) {
|
||||||
|
fs = suppliedFilesystem;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
fs = FileSystem.get(new URI(basePath.toString()), conf);
|
||||||
|
} catch (URISyntaxException ex) {
|
||||||
|
throw new MetricsException("The supplied filesystem base path URI"
|
||||||
|
+ " is not a valid URI: " + basePath.toString(), ex);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new MetricsException("Error connecting to file system: "
|
||||||
|
+ basePath + " [" + ex.toString() + "]", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether the file system supports append and return the answer.
|
* Test whether the file system supports append and return the answer.
|
||||||
* @param fs the target file system
|
* @param fs the target file system
|
||||||
|
@ -232,7 +350,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
|
|
||||||
next.setTime(now);
|
next.setTime(now);
|
||||||
|
|
||||||
if (isTest) {
|
if (flushQuickly) {
|
||||||
// If we're running unit tests, flush after a short pause
|
// If we're running unit tests, flush after a short pause
|
||||||
next.add(Calendar.MILLISECOND, 400);
|
next.add(Calendar.MILLISECOND, 400);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -32,18 +32,18 @@ import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.configuration.SubsetConfiguration;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.metrics2.MetricsException;
|
import org.apache.hadoop.metrics2.MetricsException;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
@ -66,9 +66,11 @@ import static org.junit.Assert.assertTrue;
|
||||||
* methods for classes that extend it.
|
* methods for classes that extend it.
|
||||||
*/
|
*/
|
||||||
public class RollingFileSystemSinkTestBase {
|
public class RollingFileSystemSinkTestBase {
|
||||||
|
protected static final String SINK_PRINCIPAL_KEY = "rfssink.principal";
|
||||||
|
protected static final String SINK_KEYTAB_FILE_KEY = "rfssink.keytab";
|
||||||
protected static final File ROOT_TEST_DIR =
|
protected static final File ROOT_TEST_DIR =
|
||||||
new File(System.getProperty("test.build.data", "target/"),
|
new File(System.getProperty("test.build.data", "target/test"),
|
||||||
"FileSystemSinkTest");
|
"RollingFileSystemSinkTest");
|
||||||
protected static final SimpleDateFormat DATE_FORMAT =
|
protected static final SimpleDateFormat DATE_FORMAT =
|
||||||
new SimpleDateFormat("yyyyMMddHH");
|
new SimpleDateFormat("yyyyMMddHH");
|
||||||
protected static File methodDir;
|
protected static File methodDir;
|
||||||
|
@ -118,8 +120,9 @@ public class RollingFileSystemSinkTestBase {
|
||||||
* Set the date format's timezone to GMT.
|
* Set the date format's timezone to GMT.
|
||||||
*/
|
*/
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setTZ() {
|
public static void setup() {
|
||||||
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
|
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||||
|
FileUtil.fullyDelete(ROOT_TEST_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -128,7 +131,7 @@ public class RollingFileSystemSinkTestBase {
|
||||||
*/
|
*/
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void deleteBaseDir() throws IOException {
|
public static void deleteBaseDir() throws IOException {
|
||||||
FileUtils.deleteDirectory(ROOT_TEST_DIR);
|
FileUtil.fullyDelete(ROOT_TEST_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -139,11 +142,14 @@ public class RollingFileSystemSinkTestBase {
|
||||||
public void createMethodDir() throws IOException {
|
public void createMethodDir() throws IOException {
|
||||||
methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
|
methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
|
||||||
|
|
||||||
methodDir.mkdirs();
|
assertTrue("Test directory already exists: " + methodDir,
|
||||||
|
methodDir.mkdirs());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up the metrics system, start it, and return it.
|
* Set up the metrics system, start it, and return it. The principal and
|
||||||
|
* keytab properties will not be set.
|
||||||
|
*
|
||||||
* @param path the base path for the sink
|
* @param path the base path for the sink
|
||||||
* @param ignoreErrors whether the sink should ignore errors
|
* @param ignoreErrors whether the sink should ignore errors
|
||||||
* @param allowAppend whether the sink is allowed to append to existing files
|
* @param allowAppend whether the sink is allowed to append to existing files
|
||||||
|
@ -151,18 +157,37 @@ public class RollingFileSystemSinkTestBase {
|
||||||
*/
|
*/
|
||||||
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
||||||
boolean allowAppend) {
|
boolean allowAppend) {
|
||||||
|
return initMetricsSystem(path, ignoreErrors, allowAppend, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up the metrics system, start it, and return it.
|
||||||
|
* @param path the base path for the sink
|
||||||
|
* @param ignoreErrors whether the sink should ignore errors
|
||||||
|
* @param allowAppend whether the sink is allowed to append to existing files
|
||||||
|
* @param useSecureParams whether to set the principal and keytab properties
|
||||||
|
* @return the org.apache.hadoop.metrics2.MetricsSystem
|
||||||
|
*/
|
||||||
|
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
||||||
|
boolean allowAppend, boolean useSecureParams) {
|
||||||
// If the prefix is not lower case, the metrics system won't be able to
|
// If the prefix is not lower case, the metrics system won't be able to
|
||||||
// read any of the properties.
|
// read any of the properties.
|
||||||
final String prefix = methodName.getMethodName().toLowerCase();
|
String prefix = methodName.getMethodName().toLowerCase();
|
||||||
|
|
||||||
new ConfigBuilder().add("*.period", 10000)
|
ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
|
||||||
.add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
|
.add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
|
||||||
.add(prefix + ".sink.mysink0.basepath", path)
|
.add(prefix + ".sink.mysink0.basepath", path)
|
||||||
.add(prefix + ".sink.mysink0.source", "testsrc")
|
.add(prefix + ".sink.mysink0.source", "testsrc")
|
||||||
.add(prefix + ".sink.mysink0.context", "test1")
|
.add(prefix + ".sink.mysink0.context", "test1")
|
||||||
.add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
|
.add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
|
||||||
.add(prefix + ".sink.mysink0.allow-append", allowAppend)
|
.add(prefix + ".sink.mysink0.allow-append", allowAppend);
|
||||||
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
|
|
||||||
|
if (useSecureParams) {
|
||||||
|
builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY)
|
||||||
|
.add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
|
||||||
|
|
||||||
MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
|
MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
|
||||||
|
|
||||||
|
@ -481,6 +506,17 @@ public class RollingFileSystemSinkTestBase {
|
||||||
public static class ErrorSink extends RollingFileSystemSink {
|
public static class ErrorSink extends RollingFileSystemSink {
|
||||||
public static volatile boolean errored = false;
|
public static volatile boolean errored = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(SubsetConfiguration conf) {
|
||||||
|
try {
|
||||||
|
super.init(conf);
|
||||||
|
} catch (MetricsException ex) {
|
||||||
|
errored = true;
|
||||||
|
|
||||||
|
throw new MetricsException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void putMetrics(MetricsRecord record) {
|
public void putMetrics(MetricsRecord record) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -64,6 +64,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
|
HDFS-9637. Tests for RollingFileSystemSink. (Daniel Templeton via kasha)
|
||||||
|
|
||||||
|
HDFS-9780. RollingFileSystemSink doesn't work on secure clusters.
|
||||||
|
(Daniel Templeton via kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.metrics2.sink;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -29,12 +31,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.metrics2.MetricsException;
|
import org.apache.hadoop.metrics2.MetricsException;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
|
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import org.junit.Before;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the {@link RollingFileSystemSink} class in the context of HDFS.
|
* Test the {@link RollingFileSystemSink} class in the context of HDFS.
|
||||||
|
@ -60,7 +60,7 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
||||||
|
|
||||||
// Also clear sink flags
|
// Also clear sink flags
|
||||||
RollingFileSystemSink.isTest = false;
|
RollingFileSystemSink.flushQuickly = false;
|
||||||
RollingFileSystemSink.hasFlushed = false;
|
RollingFileSystemSink.hasFlushed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,8 +256,7 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testFlushThread() throws Exception {
|
public void testFlushThread() throws Exception {
|
||||||
RollingFileSystemSink.isTest = true;
|
RollingFileSystemSink.flushQuickly = true;
|
||||||
RollingFileSystemSink.hasFlushed = false;
|
|
||||||
|
|
||||||
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
||||||
MetricsSystem ms = initMetricsSystem(path, true, false);
|
MetricsSystem ms = initMetricsSystem(path, true, false);
|
||||||
|
|
|
@ -0,0 +1,283 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.sink;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Properties;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
|
import org.apache.hadoop.security.NullGroupsMapping;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the {@link RollingFileSystemSink} class in the context of HDFS with
|
||||||
|
* Kerberos enabled.
|
||||||
|
*/
|
||||||
|
public class TestRollingFileSystemSinkWithSecureHdfs
|
||||||
|
extends RollingFileSystemSinkTestBase {
|
||||||
|
private static final int NUM_DATANODES = 4;
|
||||||
|
private static MiniKdc kdc;
|
||||||
|
private static String sinkPrincipal;
|
||||||
|
private static String sinkKeytab;
|
||||||
|
private static String hdfsPrincipal;
|
||||||
|
private static String hdfsKeytab;
|
||||||
|
private static String spnegoPrincipal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do a basic write test against an HDFS cluster with Kerberos enabled. We
|
||||||
|
* assume that if a basic write succeeds, more complex operations will also
|
||||||
|
* succeed.
|
||||||
|
*
|
||||||
|
* @throws Exception thrown if things break
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWithSecureHDFS() throws Exception {
|
||||||
|
RollingFileSystemSink.flushQuickly = false;
|
||||||
|
RollingFileSystemSink.hasFlushed = false;
|
||||||
|
initKdc();
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
|
||||||
|
|
||||||
|
RollingFileSystemSink.suppliedConf = conf;
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
UserGroupInformation sink = createDirectoriesSecurely(cluster);
|
||||||
|
final String path =
|
||||||
|
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
|
||||||
|
final MetricsSystem ms = initMetricsSystem(path, true, false, true);
|
||||||
|
|
||||||
|
assertMetricsContents(
|
||||||
|
sink.doAs(new PrivilegedExceptionAction<String>() {
|
||||||
|
@Override
|
||||||
|
public String run() throws Exception {
|
||||||
|
return doWriteTest(ms, path, 1);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdownKdc();
|
||||||
|
|
||||||
|
// Restore non-secure conf
|
||||||
|
UserGroupInformation.setConfiguration(new Configuration());
|
||||||
|
RollingFileSystemSink.suppliedConf = null;
|
||||||
|
RollingFileSystemSink.suppliedFilesystem = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do a basic write test against an HDFS cluster with Kerberos enabled but
|
||||||
|
* without the principal and keytab properties set.
|
||||||
|
*
|
||||||
|
* @throws Exception thrown if things break
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMissingPropertiesWithSecureHDFS() throws Exception {
|
||||||
|
RollingFileSystemSink.flushQuickly = false;
|
||||||
|
RollingFileSystemSink.hasFlushed = false;
|
||||||
|
initKdc();
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
|
||||||
|
|
||||||
|
RollingFileSystemSink.suppliedConf = conf;
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final String path =
|
||||||
|
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
|
||||||
|
|
||||||
|
createDirectoriesSecurely(cluster);
|
||||||
|
initMetricsSystem(path, true, false);
|
||||||
|
|
||||||
|
assertTrue("No exception was generated initializing the sink against a "
|
||||||
|
+ "secure cluster even though the principal and keytab properties "
|
||||||
|
+ "were missing", ErrorSink.errored);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdownKdc();
|
||||||
|
|
||||||
|
// Restore non-secure conf
|
||||||
|
UserGroupInformation.setConfiguration(new Configuration());
|
||||||
|
RollingFileSystemSink.suppliedConf = null;
|
||||||
|
RollingFileSystemSink.suppliedFilesystem = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and
|
||||||
|
* return the UGI for <i>sink</i>.
|
||||||
|
*
|
||||||
|
* @param cluster the mini-cluster
|
||||||
|
* @return the UGI for <i>sink</i>
|
||||||
|
* @throws IOException thrown if login or directory creation fails
|
||||||
|
* @throws InterruptedException thrown if interrupted while creating a
|
||||||
|
* file system handle
|
||||||
|
*/
|
||||||
|
protected UserGroupInformation createDirectoriesSecurely(final MiniDFSCluster cluster)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Path tmp = new Path("/tmp");
|
||||||
|
Path test = new Path(tmp, "test");
|
||||||
|
|
||||||
|
UserGroupInformation hdfs =
|
||||||
|
UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal,
|
||||||
|
hdfsKeytab);
|
||||||
|
FileSystem fsForSuperUser =
|
||||||
|
hdfs.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
|
@Override
|
||||||
|
public FileSystem run() throws Exception {
|
||||||
|
return cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
fsForSuperUser.mkdirs(tmp);
|
||||||
|
fsForSuperUser.setPermission(tmp, new FsPermission((short)0777));
|
||||||
|
|
||||||
|
UserGroupInformation sink =
|
||||||
|
UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
|
||||||
|
sinkKeytab);
|
||||||
|
FileSystem fsForSink =
|
||||||
|
sink.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
|
@Override
|
||||||
|
public FileSystem run() throws Exception {
|
||||||
|
return cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
fsForSink.mkdirs(test);
|
||||||
|
RollingFileSystemSink.suppliedFilesystem = fsForSink;
|
||||||
|
|
||||||
|
return sink;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the KDC for testing a secure HDFS cluster
|
||||||
|
*
|
||||||
|
* @throws Exception thrown if the KDC setup fails
|
||||||
|
*/
|
||||||
|
public static void initKdc() throws Exception {
|
||||||
|
Properties kdcConf = MiniKdc.createConf();
|
||||||
|
kdc = new MiniKdc(kdcConf, methodDir);
|
||||||
|
kdc.start();
|
||||||
|
|
||||||
|
File sinkKeytabFile = new File(methodDir, "sink.keytab");
|
||||||
|
sinkKeytab = sinkKeytabFile.getAbsolutePath();
|
||||||
|
kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
|
||||||
|
sinkPrincipal = "sink/localhost@" + kdc.getRealm();
|
||||||
|
|
||||||
|
File hdfsKeytabFile = new File(methodDir, "hdfs.keytab");
|
||||||
|
hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
|
||||||
|
kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
|
||||||
|
"HTTP/localhost");
|
||||||
|
hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
|
||||||
|
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the mini-KDC.
|
||||||
|
*/
|
||||||
|
public static void shutdownKdc() {
|
||||||
|
if (kdc != null) {
|
||||||
|
kdc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a configuration for starting a secure cluster.
|
||||||
|
*
|
||||||
|
* @param dataTransferProtection supported QOPs
|
||||||
|
* @return configuration for starting a secure cluster
|
||||||
|
* @throws Exception if there is any failure
|
||||||
|
*/
|
||||||
|
protected HdfsConfiguration createSecureConfig(
|
||||||
|
String dataTransferProtection) throws Exception {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
SecurityUtil.setAuthenticationMethod(
|
||||||
|
UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
|
||||||
|
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
|
||||||
|
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab);
|
||||||
|
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
|
||||||
|
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab);
|
||||||
|
conf.set(SINK_PRINCIPAL_KEY, sinkPrincipal);
|
||||||
|
conf.set(SINK_KEYTAB_FILE_KEY, sinkKeytab);
|
||||||
|
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
|
||||||
|
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
|
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
|
||||||
|
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||||
|
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||||
|
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||||
|
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
NullGroupsMapping.class.getName());
|
||||||
|
|
||||||
|
String keystoresDir = methodDir.getAbsolutePath();
|
||||||
|
String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
|
||||||
|
|
||||||
|
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
||||||
|
conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||||
|
KeyStoreTestUtil.getClientSSLConfigFileName());
|
||||||
|
conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||||
|
KeyStoreTestUtil.getServerSSLConfigFileName());
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue