From c2460dad642feee1086442d33c30c24ec77236b9 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 25 Feb 2016 16:31:01 -0800 Subject: [PATCH] HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. (Daniel Templeton via kasha) --- .../metrics2/sink/RollingFileSystemSink.java | 143 ++++++++++++------ .../sink/RollingFileSystemSinkTestBase.java | 7 +- .../sink/TestRollingFileSystemSink.java | 10 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../TestRollingFileSystemSinkWithHdfs.java | 34 ++++- ...stRollingFileSystemSinkWithSecureHdfs.java | 3 +- 6 files changed, 138 insertions(+), 62 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java index 2c0a26a43d7..9a439013e6e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java @@ -132,6 +132,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); private final Object lock = new Object(); + private boolean initialized = false; + private SubsetConfiguration properties; + private Configuration conf; private String source; private boolean ignoreError; private boolean allowAppend; @@ -163,63 +166,102 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { protected static FileSystem suppliedFilesystem = null; @Override - public void init(SubsetConfiguration conf) { - basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); - source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT); - ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false); - allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false); + public void init(SubsetConfiguration metrics2Properties) { + properties = metrics2Properties; + basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); + source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT); + ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false); + allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false); - Configuration configuration = loadConf(); - - UserGroupInformation.setConfiguration(configuration); + conf = loadConf(); + UserGroupInformation.setConfiguration(conf); // Don't do secure setup if it's not needed. if (UserGroupInformation.isSecurityEnabled()) { // Validate config so that we don't get an NPE - checkForProperty(conf, KEYTAB_PROPERTY_KEY); - checkForProperty(conf, USERNAME_PROPERTY_KEY); + checkForProperty(properties, KEYTAB_PROPERTY_KEY); + checkForProperty(properties, USERNAME_PROPERTY_KEY); try { // Login as whoever we're supposed to be and let the hostname be pulled // from localhost. If security isn't enabled, this does nothing. - SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY), - conf.getString(USERNAME_PROPERTY_KEY)); + SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY), + properties.getString(USERNAME_PROPERTY_KEY)); } catch (IOException ex) { throw new MetricsException("Error logging in securely: [" + ex.toString() + "]", ex); } } + } - fileSystem = getFileSystem(configuration); + /** + * Initialize the connection to HDFS and create the base directory. Also + * launch the flush thread. + */ + private boolean initFs() { + boolean success = false; + + fileSystem = getFileSystem(); // This step isn't strictly necessary, but it makes debugging issues much // easier. We try to create the base directory eagerly and fail with // copious debug info if it fails. try { fileSystem.mkdirs(basePath); + success = true; } catch (Exception ex) { - throw new MetricsException("Failed to create " + basePath + "[" - + SOURCE_KEY + "=" + source + ", " - + IGNORE_ERROR_KEY + "=" + ignoreError + ", " - + ALLOW_APPEND_KEY + "=" + allowAppend + ", " - + KEYTAB_PROPERTY_KEY + "=" - + conf.getString(KEYTAB_PROPERTY_KEY) + ", " - + conf.getString(KEYTAB_PROPERTY_KEY) + "=" - + configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", " - + USERNAME_PROPERTY_KEY + "=" - + conf.getString(USERNAME_PROPERTY_KEY) + ", " - + conf.getString(USERNAME_PROPERTY_KEY) + "=" - + configuration.get(conf.getString(USERNAME_PROPERTY_KEY)) - + "] -- " + ex.toString(), ex); + if (!ignoreError) { + throw new MetricsException("Failed to create " + basePath + "[" + + SOURCE_KEY + "=" + source + ", " + + ALLOW_APPEND_KEY + "=" + allowAppend + ", " + + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", " + + stringifySecurityProperty(USERNAME_PROPERTY_KEY) + + "] -- " + ex.toString(), ex); + } } - // If we're permitted to append, check if we actually can - if (allowAppend) { - allowAppend = checkAppend(fileSystem); + if (success) { + // If we're permitted to append, check if we actually can + if (allowAppend) { + allowAppend = checkAppend(fileSystem); + } + + flushTimer = new Timer("RollingFileSystemSink Flusher", true); } - flushTimer = new Timer("RollingFileSystemSink Flusher", true); + return success; + } + + /** + * Turn a security property into a nicely formatted set of name=value + * strings, allowing for either the property or the configuration not to be + * set. + * + * @param properties the sink properties + * @param conf the conf + * @param property the property to stringify + * @return the stringified property + */ + private String stringifySecurityProperty(String property) { + String securityProperty; + + if (properties.containsKey(property)) { + String propertyValue = properties.getString(property); + String confValue = conf.get(properties.getString(property)); + + if (confValue != null) { + securityProperty = property + "=" + propertyValue + + ", " + properties.getString(property) + "=" + confValue; + } else { + securityProperty = property + "=" + propertyValue + + ", " + properties.getString(property) + "="; + } + } else { + securityProperty = property + "="; + } + + return securityProperty; } /** @@ -242,17 +284,17 @@ private static void checkForProperty(SubsetConfiguration conf, String key) { * @return the configuration to use */ private Configuration loadConf() { - Configuration conf; + Configuration c; if (suppliedConf != null) { - conf = suppliedConf; + c = suppliedConf; } else { // The config we're handed in init() isn't the one we want here, so we // create a new one to pick up the full settings. - conf = new Configuration(); + c = new Configuration(); } - return conf; + return c; } /** @@ -263,7 +305,7 @@ private Configuration loadConf() { * @return the file system to use * @throws MetricsException thrown if the file system could not be retrieved */ - private FileSystem getFileSystem(Configuration conf) throws MetricsException { + private FileSystem getFileSystem() throws MetricsException { FileSystem fs = null; if (suppliedFilesystem != null) { @@ -317,22 +359,29 @@ private void rollLogDirIfNeeded() throws MetricsException { // because if currentDirPath is null, then currentOutStream is null, but // currentOutStream can be null for other reasons. if ((currentOutStream == null) || !path.equals(currentDirPath)) { - // Close the stream. This step could have been handled already by the - // flusher thread, but if it has, the PrintStream will just swallow the - // exception, which is fine. - if (currentOutStream != null) { - currentOutStream.close(); + // If we're not yet connected to HDFS, create the connection + if (!initialized) { + initialized = initFs(); } - currentDirPath = path; + if (initialized) { + // Close the stream. This step could have been handled already by the + // flusher thread, but if it has, the PrintStream will just swallow the + // exception, which is fine. + if (currentOutStream != null) { + currentOutStream.close(); + } - try { - rollLogDir(); - } catch (IOException ex) { - throwMetricsException("Failed to create new log file", ex); + currentDirPath = path; + + try { + rollLogDir(); + } catch (IOException ex) { + throwMetricsException("Failed to create new log file", ex); + } + + scheduleFlush(now); } - - scheduleFlush(now); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java index f1ad058d0a1..9914c5e30f3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java @@ -175,7 +175,7 @@ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, String prefix = methodName.getMethodName().toLowerCase(); ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000) - .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName()) + .add(prefix + ".sink.mysink0.class", MockSink.class.getName()) .add(prefix + ".sink.mysink0.basepath", path) .add(prefix + ".sink.mysink0.source", "testsrc") .add(prefix + ".sink.mysink0.context", "test1") @@ -503,8 +503,9 @@ public void assertFileCount(FileSystem fs, Path dir, int expected) * This class is a {@link RollingFileSystemSink} wrapper that tracks whether * an exception has been thrown during operations. */ - public static class ErrorSink extends RollingFileSystemSink { + public static class MockSink extends RollingFileSystemSink { public static volatile boolean errored = false; + public static volatile boolean initialized = false; @Override public void init(SubsetConfiguration conf) { @@ -515,6 +516,8 @@ public void init(SubsetConfiguration conf) { throw new MetricsException(ex); } + + initialized = true; } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java index da632351be2..3c6cd27b59b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java @@ -23,6 +23,8 @@ import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Test the {@link RollingFileSystemSink} class in the context of the local file @@ -106,7 +108,7 @@ public void testFailedWrite() { new MyMetrics1().registerWith(ms); methodDir.setWritable(false); - ErrorSink.errored = false; + MockSink.errored = false; try { // publish the metrics @@ -114,7 +116,7 @@ public void testFailedWrite() { assertTrue("No exception was generated while writing metrics " + "even though the target directory was not writable", - ErrorSink.errored); + MockSink.errored); ms.stop(); ms.shutdown(); @@ -135,7 +137,7 @@ public void testSilentFailedWrite() { new MyMetrics1().registerWith(ms); methodDir.setWritable(false); - ErrorSink.errored = false; + MockSink.errored = false; try { // publish the metrics @@ -144,7 +146,7 @@ public void testSilentFailedWrite() { assertFalse("An exception was generated while writing metrics " + "when the target directory was not writable, even though the " + "sink is set to ignore errors", - ErrorSink.errored); + MockSink.errored); ms.stop(); ms.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e3990eaa31d..104b46ddc3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1047,6 +1047,9 @@ Release 2.9.0 - UNRELEASED HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. (Wei Zhou via wang) + HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. + (Daniel Templeton via kasha) + Release 2.8.0 - UNRELEASED NEW FEATURES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java index 0f3725b60df..9984b34fbff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java @@ -151,12 +151,12 @@ public void testFailedWrite() throws IOException { new MyMetrics1().registerWith(ms); shutdownHdfs(); - ErrorSink.errored = false; + MockSink.errored = false; ms.publishMetricsNow(); // publish the metrics assertTrue("No exception was generated while writing metrics " - + "even though HDFS was unavailable", ErrorSink.errored); + + "even though HDFS was unavailable", MockSink.errored); ms.stop(); ms.shutdown(); @@ -178,12 +178,12 @@ public void testFailedClose() throws IOException { ms.publishMetricsNow(); // publish the metrics shutdownHdfs(); - ErrorSink.errored = false; + MockSink.errored = false; ms.stop(); assertTrue("No exception was generated while stopping sink " - + "even though HDFS was unavailable", ErrorSink.errored); + + "even though HDFS was unavailable", MockSink.errored); ms.shutdown(); } @@ -203,13 +203,13 @@ public void testSilentFailedWrite() throws IOException, InterruptedException { new MyMetrics1().registerWith(ms); shutdownHdfs(); - ErrorSink.errored = false; + MockSink.errored = false; ms.publishMetricsNow(); // publish the metrics assertFalse("An exception was generated writing metrics " + "while HDFS was unavailable, even though the sink is set to " - + "ignore errors", ErrorSink.errored); + + "ignore errors", MockSink.errored); ms.stop(); ms.shutdown(); @@ -231,13 +231,13 @@ public void testSilentFailedClose() throws IOException { ms.publishMetricsNow(); // publish the metrics shutdownHdfs(); - ErrorSink.errored = false; + MockSink.errored = false; ms.stop(); assertFalse("An exception was generated stopping sink " + "while HDFS was unavailable, even though the sink is set to " - + "ignore errors", ErrorSink.errored); + + "ignore errors", MockSink.errored); ms.shutdown(); } @@ -283,4 +283,22 @@ public void testFlushThread() throws Exception { ms.stop(); } + + /** + * Test that a failure to connect to HDFS does not cause the init() method + * to fail. + */ + @Test + public void testInitWithNoHDFS() { + String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; + + shutdownHdfs(); + MockSink.errored = false; + initMetricsSystem(path, true, false); + + assertTrue("The sink was not initialized as expected", + MockSink.initialized); + assertFalse("The sink threw an unexpected error on initialization", + MockSink.errored); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java index 12b71f89a58..dce4fdcf229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java @@ -51,6 +51,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertTrue; /** * Test the {@link RollingFileSystemSink} class in the context of HDFS with @@ -147,7 +148,7 @@ public void testMissingPropertiesWithSecureHDFS() throws Exception { assertTrue("No exception was generated initializing the sink against a " + "secure cluster even though the principal and keytab properties " - + "were missing", ErrorSink.errored); + + "were missing", MockSink.errored); } finally { if (cluster != null) { cluster.shutdown();