HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. (Daniel Templeton via kasha)
This commit is contained in:
parent
b2951f9fbc
commit
c2460dad64
|
@ -132,6 +132,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
private static final FastDateFormat DATE_FORMAT =
|
private static final FastDateFormat DATE_FORMAT =
|
||||||
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
|
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
private boolean initialized = false;
|
||||||
|
private SubsetConfiguration properties;
|
||||||
|
private Configuration conf;
|
||||||
private String source;
|
private String source;
|
||||||
private boolean ignoreError;
|
private boolean ignoreError;
|
||||||
private boolean allowAppend;
|
private boolean allowAppend;
|
||||||
|
@ -163,57 +166,62 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
protected static FileSystem suppliedFilesystem = null;
|
protected static FileSystem suppliedFilesystem = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(SubsetConfiguration conf) {
|
public void init(SubsetConfiguration metrics2Properties) {
|
||||||
basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
|
properties = metrics2Properties;
|
||||||
source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
|
basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
|
||||||
ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
|
||||||
allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
|
||||||
|
allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
|
||||||
|
|
||||||
Configuration configuration = loadConf();
|
conf = loadConf();
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
UserGroupInformation.setConfiguration(configuration);
|
|
||||||
|
|
||||||
// Don't do secure setup if it's not needed.
|
// Don't do secure setup if it's not needed.
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
// Validate config so that we don't get an NPE
|
// Validate config so that we don't get an NPE
|
||||||
checkForProperty(conf, KEYTAB_PROPERTY_KEY);
|
checkForProperty(properties, KEYTAB_PROPERTY_KEY);
|
||||||
checkForProperty(conf, USERNAME_PROPERTY_KEY);
|
checkForProperty(properties, USERNAME_PROPERTY_KEY);
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Login as whoever we're supposed to be and let the hostname be pulled
|
// Login as whoever we're supposed to be and let the hostname be pulled
|
||||||
// from localhost. If security isn't enabled, this does nothing.
|
// from localhost. If security isn't enabled, this does nothing.
|
||||||
SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
|
SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
|
||||||
conf.getString(USERNAME_PROPERTY_KEY));
|
properties.getString(USERNAME_PROPERTY_KEY));
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new MetricsException("Error logging in securely: ["
|
throw new MetricsException("Error logging in securely: ["
|
||||||
+ ex.toString() + "]", ex);
|
+ ex.toString() + "]", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fileSystem = getFileSystem(configuration);
|
/**
|
||||||
|
* Initialize the connection to HDFS and create the base directory. Also
|
||||||
|
* launch the flush thread.
|
||||||
|
*/
|
||||||
|
private boolean initFs() {
|
||||||
|
boolean success = false;
|
||||||
|
|
||||||
|
fileSystem = getFileSystem();
|
||||||
|
|
||||||
// This step isn't strictly necessary, but it makes debugging issues much
|
// This step isn't strictly necessary, but it makes debugging issues much
|
||||||
// easier. We try to create the base directory eagerly and fail with
|
// easier. We try to create the base directory eagerly and fail with
|
||||||
// copious debug info if it fails.
|
// copious debug info if it fails.
|
||||||
try {
|
try {
|
||||||
fileSystem.mkdirs(basePath);
|
fileSystem.mkdirs(basePath);
|
||||||
|
success = true;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
if (!ignoreError) {
|
||||||
throw new MetricsException("Failed to create " + basePath + "["
|
throw new MetricsException("Failed to create " + basePath + "["
|
||||||
+ SOURCE_KEY + "=" + source + ", "
|
+ SOURCE_KEY + "=" + source + ", "
|
||||||
+ IGNORE_ERROR_KEY + "=" + ignoreError + ", "
|
|
||||||
+ ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
+ ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
||||||
+ KEYTAB_PROPERTY_KEY + "="
|
+ stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
|
||||||
+ conf.getString(KEYTAB_PROPERTY_KEY) + ", "
|
+ stringifySecurityProperty(USERNAME_PROPERTY_KEY)
|
||||||
+ conf.getString(KEYTAB_PROPERTY_KEY) + "="
|
|
||||||
+ configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
|
|
||||||
+ USERNAME_PROPERTY_KEY + "="
|
|
||||||
+ conf.getString(USERNAME_PROPERTY_KEY) + ", "
|
|
||||||
+ conf.getString(USERNAME_PROPERTY_KEY) + "="
|
|
||||||
+ configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
|
|
||||||
+ "] -- " + ex.toString(), ex);
|
+ "] -- " + ex.toString(), ex);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (success) {
|
||||||
// If we're permitted to append, check if we actually can
|
// If we're permitted to append, check if we actually can
|
||||||
if (allowAppend) {
|
if (allowAppend) {
|
||||||
allowAppend = checkAppend(fileSystem);
|
allowAppend = checkAppend(fileSystem);
|
||||||
|
@ -222,6 +230,40 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Turn a security property into a nicely formatted set of <i>name=value</i>
|
||||||
|
* strings, allowing for either the property or the configuration not to be
|
||||||
|
* set.
|
||||||
|
*
|
||||||
|
* @param properties the sink properties
|
||||||
|
* @param conf the conf
|
||||||
|
* @param property the property to stringify
|
||||||
|
* @return the stringified property
|
||||||
|
*/
|
||||||
|
private String stringifySecurityProperty(String property) {
|
||||||
|
String securityProperty;
|
||||||
|
|
||||||
|
if (properties.containsKey(property)) {
|
||||||
|
String propertyValue = properties.getString(property);
|
||||||
|
String confValue = conf.get(properties.getString(property));
|
||||||
|
|
||||||
|
if (confValue != null) {
|
||||||
|
securityProperty = property + "=" + propertyValue
|
||||||
|
+ ", " + properties.getString(property) + "=" + confValue;
|
||||||
|
} else {
|
||||||
|
securityProperty = property + "=" + propertyValue
|
||||||
|
+ ", " + properties.getString(property) + "=<NOT SET>";
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
securityProperty = property + "=<NOT SET>";
|
||||||
|
}
|
||||||
|
|
||||||
|
return securityProperty;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throw a {@link MetricsException} if the given property is not set.
|
* Throw a {@link MetricsException} if the given property is not set.
|
||||||
*
|
*
|
||||||
|
@ -242,17 +284,17 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
* @return the configuration to use
|
* @return the configuration to use
|
||||||
*/
|
*/
|
||||||
private Configuration loadConf() {
|
private Configuration loadConf() {
|
||||||
Configuration conf;
|
Configuration c;
|
||||||
|
|
||||||
if (suppliedConf != null) {
|
if (suppliedConf != null) {
|
||||||
conf = suppliedConf;
|
c = suppliedConf;
|
||||||
} else {
|
} else {
|
||||||
// The config we're handed in init() isn't the one we want here, so we
|
// The config we're handed in init() isn't the one we want here, so we
|
||||||
// create a new one to pick up the full settings.
|
// create a new one to pick up the full settings.
|
||||||
conf = new Configuration();
|
c = new Configuration();
|
||||||
}
|
}
|
||||||
|
|
||||||
return conf;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -263,7 +305,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
* @return the file system to use
|
* @return the file system to use
|
||||||
* @throws MetricsException thrown if the file system could not be retrieved
|
* @throws MetricsException thrown if the file system could not be retrieved
|
||||||
*/
|
*/
|
||||||
private FileSystem getFileSystem(Configuration conf) throws MetricsException {
|
private FileSystem getFileSystem() throws MetricsException {
|
||||||
FileSystem fs = null;
|
FileSystem fs = null;
|
||||||
|
|
||||||
if (suppliedFilesystem != null) {
|
if (suppliedFilesystem != null) {
|
||||||
|
@ -317,6 +359,12 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
// because if currentDirPath is null, then currentOutStream is null, but
|
// because if currentDirPath is null, then currentOutStream is null, but
|
||||||
// currentOutStream can be null for other reasons.
|
// currentOutStream can be null for other reasons.
|
||||||
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
|
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
|
||||||
|
// If we're not yet connected to HDFS, create the connection
|
||||||
|
if (!initialized) {
|
||||||
|
initialized = initFs();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (initialized) {
|
||||||
// Close the stream. This step could have been handled already by the
|
// Close the stream. This step could have been handled already by the
|
||||||
// flusher thread, but if it has, the PrintStream will just swallow the
|
// flusher thread, but if it has, the PrintStream will just swallow the
|
||||||
// exception, which is fine.
|
// exception, which is fine.
|
||||||
|
@ -335,6 +383,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||||
scheduleFlush(now);
|
scheduleFlush(now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule the current hour's directory to be flushed at the top of the next
|
* Schedule the current hour's directory to be flushed at the top of the next
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class RollingFileSystemSinkTestBase {
|
||||||
String prefix = methodName.getMethodName().toLowerCase();
|
String prefix = methodName.getMethodName().toLowerCase();
|
||||||
|
|
||||||
ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
|
ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
|
||||||
.add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
|
.add(prefix + ".sink.mysink0.class", MockSink.class.getName())
|
||||||
.add(prefix + ".sink.mysink0.basepath", path)
|
.add(prefix + ".sink.mysink0.basepath", path)
|
||||||
.add(prefix + ".sink.mysink0.source", "testsrc")
|
.add(prefix + ".sink.mysink0.source", "testsrc")
|
||||||
.add(prefix + ".sink.mysink0.context", "test1")
|
.add(prefix + ".sink.mysink0.context", "test1")
|
||||||
|
@ -503,8 +503,9 @@ public class RollingFileSystemSinkTestBase {
|
||||||
* This class is a {@link RollingFileSystemSink} wrapper that tracks whether
|
* This class is a {@link RollingFileSystemSink} wrapper that tracks whether
|
||||||
* an exception has been thrown during operations.
|
* an exception has been thrown during operations.
|
||||||
*/
|
*/
|
||||||
public static class ErrorSink extends RollingFileSystemSink {
|
public static class MockSink extends RollingFileSystemSink {
|
||||||
public static volatile boolean errored = false;
|
public static volatile boolean errored = false;
|
||||||
|
public static volatile boolean initialized = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(SubsetConfiguration conf) {
|
public void init(SubsetConfiguration conf) {
|
||||||
|
@ -515,6 +516,8 @@ public class RollingFileSystemSinkTestBase {
|
||||||
|
|
||||||
throw new MetricsException(ex);
|
throw new MetricsException(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the {@link RollingFileSystemSink} class in the context of the local file
|
* Test the {@link RollingFileSystemSink} class in the context of the local file
|
||||||
|
@ -106,7 +108,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
|
||||||
new MyMetrics1().registerWith(ms);
|
new MyMetrics1().registerWith(ms);
|
||||||
|
|
||||||
methodDir.setWritable(false);
|
methodDir.setWritable(false);
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// publish the metrics
|
// publish the metrics
|
||||||
|
@ -114,7 +116,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
|
||||||
|
|
||||||
assertTrue("No exception was generated while writing metrics "
|
assertTrue("No exception was generated while writing metrics "
|
||||||
+ "even though the target directory was not writable",
|
+ "even though the target directory was not writable",
|
||||||
ErrorSink.errored);
|
MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
@ -135,7 +137,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
|
||||||
new MyMetrics1().registerWith(ms);
|
new MyMetrics1().registerWith(ms);
|
||||||
|
|
||||||
methodDir.setWritable(false);
|
methodDir.setWritable(false);
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// publish the metrics
|
// publish the metrics
|
||||||
|
@ -144,7 +146,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
|
||||||
assertFalse("An exception was generated while writing metrics "
|
assertFalse("An exception was generated while writing metrics "
|
||||||
+ "when the target directory was not writable, even though the "
|
+ "when the target directory was not writable, even though the "
|
||||||
+ "sink is set to ignore errors",
|
+ "sink is set to ignore errors",
|
||||||
ErrorSink.errored);
|
MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
|
|
@ -1047,6 +1047,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
|
HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
|
||||||
(Wei Zhou via wang)
|
(Wei Zhou via wang)
|
||||||
|
|
||||||
|
HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters.
|
||||||
|
(Daniel Templeton via kasha)
|
||||||
|
|
||||||
Release 2.8.0 - UNRELEASED
|
Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
|
@ -151,12 +151,12 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
new MyMetrics1().registerWith(ms);
|
new MyMetrics1().registerWith(ms);
|
||||||
|
|
||||||
shutdownHdfs();
|
shutdownHdfs();
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
ms.publishMetricsNow(); // publish the metrics
|
ms.publishMetricsNow(); // publish the metrics
|
||||||
|
|
||||||
assertTrue("No exception was generated while writing metrics "
|
assertTrue("No exception was generated while writing metrics "
|
||||||
+ "even though HDFS was unavailable", ErrorSink.errored);
|
+ "even though HDFS was unavailable", MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
@ -178,12 +178,12 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
ms.publishMetricsNow(); // publish the metrics
|
ms.publishMetricsNow(); // publish the metrics
|
||||||
|
|
||||||
shutdownHdfs();
|
shutdownHdfs();
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
|
|
||||||
assertTrue("No exception was generated while stopping sink "
|
assertTrue("No exception was generated while stopping sink "
|
||||||
+ "even though HDFS was unavailable", ErrorSink.errored);
|
+ "even though HDFS was unavailable", MockSink.errored);
|
||||||
|
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -203,13 +203,13 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
new MyMetrics1().registerWith(ms);
|
new MyMetrics1().registerWith(ms);
|
||||||
|
|
||||||
shutdownHdfs();
|
shutdownHdfs();
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
ms.publishMetricsNow(); // publish the metrics
|
ms.publishMetricsNow(); // publish the metrics
|
||||||
|
|
||||||
assertFalse("An exception was generated writing metrics "
|
assertFalse("An exception was generated writing metrics "
|
||||||
+ "while HDFS was unavailable, even though the sink is set to "
|
+ "while HDFS was unavailable, even though the sink is set to "
|
||||||
+ "ignore errors", ErrorSink.errored);
|
+ "ignore errors", MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
@ -231,13 +231,13 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
ms.publishMetricsNow(); // publish the metrics
|
ms.publishMetricsNow(); // publish the metrics
|
||||||
|
|
||||||
shutdownHdfs();
|
shutdownHdfs();
|
||||||
ErrorSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
|
|
||||||
assertFalse("An exception was generated stopping sink "
|
assertFalse("An exception was generated stopping sink "
|
||||||
+ "while HDFS was unavailable, even though the sink is set to "
|
+ "while HDFS was unavailable, even though the sink is set to "
|
||||||
+ "ignore errors", ErrorSink.errored);
|
+ "ignore errors", MockSink.errored);
|
||||||
|
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -283,4 +283,22 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a failure to connect to HDFS does not cause the init() method
|
||||||
|
* to fail.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInitWithNoHDFS() {
|
||||||
|
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
||||||
|
|
||||||
|
shutdownHdfs();
|
||||||
|
MockSink.errored = false;
|
||||||
|
initMetricsSystem(path, true, false);
|
||||||
|
|
||||||
|
assertTrue("The sink was not initialized as expected",
|
||||||
|
MockSink.initialized);
|
||||||
|
assertFalse("The sink threw an unexpected error on initialization",
|
||||||
|
MockSink.errored);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the {@link RollingFileSystemSink} class in the context of HDFS with
|
* Test the {@link RollingFileSystemSink} class in the context of HDFS with
|
||||||
|
@ -147,7 +148,7 @@ public class TestRollingFileSystemSinkWithSecureHdfs
|
||||||
|
|
||||||
assertTrue("No exception was generated initializing the sink against a "
|
assertTrue("No exception was generated initializing the sink against a "
|
||||||
+ "secure cluster even though the principal and keytab properties "
|
+ "secure cluster even though the principal and keytab properties "
|
||||||
+ "were missing", ErrorSink.errored);
|
+ "were missing", MockSink.errored);
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue