diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 3dde2cc9c15..40d0df36991 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -107,12 +107,12 @@ public class DNConf { final long heartBeatInterval; private final long lifelineIntervalMs; volatile long blockReportInterval; - final long blockReportSplitThreshold; + volatile long blockReportSplitThreshold; final boolean peerStatsEnabled; final boolean diskStatsEnabled; final long outliersReportIntervalMs; final long ibrInterval; - final long initialBlockReportDelayMs; + volatile long initialBlockReportDelayMs; volatile long cacheReportInterval; final long datanodeSlowIoWarningThresholdMs; @@ -215,19 +215,7 @@ public class DNConf { this.datanodeSlowIoWarningThresholdMs = getConf().getLong( DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT); - - long initBRDelay = getConf().getTimeDuration( - DFS_BLOCKREPORT_INITIAL_DELAY_KEY, - DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, - TimeUnit.SECONDS, TimeUnit.MILLISECONDS); - if (initBRDelay >= blockReportInterval) { - initBRDelay = 0; - DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + " is " - + "greater than or equal to" + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY - + ". Setting initial delay to 0 msec:"); - } - initialBlockReportDelayMs = initBRDelay; - + initBlockReportDelay(); heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS); @@ -311,6 +299,19 @@ public class DNConf { ); } + private void initBlockReportDelay() { + long initBRDelay = getConf().getTimeDuration( + DFS_BLOCKREPORT_INITIAL_DELAY_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + if (initBRDelay >= blockReportInterval || initBRDelay < 0) { + initBRDelay = 0; + DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + + " is greater than or equal to " + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + + ". Setting initial delay to 0 msec."); + } + initialBlockReportDelayMs = initBRDelay; + } + // We get minimumNameNodeVersion via a method so it can be mocked out in tests. String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; @@ -477,7 +478,8 @@ public class DNConf { } void setBlockReportInterval(long intervalMs) { - Preconditions.checkArgument(intervalMs > 0); + Preconditions.checkArgument(intervalMs > 0, + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0"); blockReportInterval = intervalMs; } @@ -487,11 +489,22 @@ public class DNConf { void setCacheReportInterval(long intervalMs) { Preconditions.checkArgument(intervalMs > 0, - "dfs.cachereport.intervalMsec should be larger than 0"); + DFS_CACHEREPORT_INTERVAL_MSEC_KEY + " should be larger than 0"); cacheReportInterval = intervalMs; } public long getCacheReportInterval() { return cacheReportInterval; } + + void setBlockReportSplitThreshold(long threshold) { + Preconditions.checkArgument(threshold >= 0, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY + " should be larger than or equal to 0"); + blockReportSplitThreshold = threshold; + } + + void setInitBRDelayMs(String delayMs) { + dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs); + initBlockReportDelay(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3c86b1189e9..e37dce6e5e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; @@ -312,6 +316,8 @@ public class DataNode extends ReconfigurableBase DFS_DATANODE_DATA_DIR_KEY, DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_KEY)); @@ -619,39 +625,10 @@ public class DataNode extends ReconfigurableBase } break; } - case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: { - ReconfigurationException rootException = null; - try { - LOG.info("Reconfiguring {} to {}", property, newVal); - long intervalMs; - if (newVal == null) { - // Set to default. - intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; - } else { - intervalMs = Long.parseLong(newVal); - } - dnConf.setBlockReportInterval(intervalMs); - for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - if (bpos != null) { - for (BPServiceActor actor : bpos.getBPServiceActors()) { - actor.getScheduler().setBlockReportIntervalMs(intervalMs); - } - } - } - return Long.toString(intervalMs); - } catch (IllegalArgumentException e) { - rootException = new ReconfigurationException( - property, newVal, getConf().get(property), e); - } finally { - if (rootException != null) { - LOG.warn(String.format( - "Exception in updating block report interval %s to %s", - property, newVal), rootException); - throw rootException; - } - } - break; - } + case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: + case DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY: + case DFS_BLOCKREPORT_INITIAL_DELAY_KEY: + return reconfBlockReportParameters(property, newVal); case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY: return reconfDataXceiverParameters(property, newVal); case DFS_CACHEREPORT_INTERVAL_MSEC_KEY: @@ -697,6 +674,44 @@ public class DataNode extends ReconfigurableBase } } + private String reconfBlockReportParameters(String property, String newVal) + throws ReconfigurationException { + String result = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) { + Preconditions.checkNotNull(dnConf, "DNConf has not been initialized."); + long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT : + Long.parseLong(newVal); + result = Long.toString(intervalMs); + dnConf.setBlockReportInterval(intervalMs); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + if (bpos != null) { + for (BPServiceActor actor : bpos.getBPServiceActors()) { + actor.getScheduler().setBlockReportIntervalMs(intervalMs); + } + } + } + } else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) { + Preconditions.checkNotNull(dnConf, "DNConf has not been initialized."); + long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT : + Long.parseLong(newVal); + result = Long.toString(threshold); + dnConf.setBlockReportSplitThreshold(threshold); + } else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) { + Preconditions.checkNotNull(dnConf, "DNConf has not been initialized."); + int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT : + Integer.parseInt(newVal); + result = Integer.toString(initialDelay); + dnConf.setInitBRDelayMs(result); + } + LOG.info("RECONFIGURE* changed {} to {}", property, newVal); + return result; + } catch (IllegalArgumentException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ @@ -3823,4 +3838,9 @@ public class DataNode extends ReconfigurableBase return (stage == PIPELINE_SETUP_STREAMING_RECOVERY || stage == PIPELINE_SETUP_APPEND_RECOVERY); } + + @VisibleForTesting + public BlockPoolManager getBlockPoolManager() { + return blockPoolManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index e0be9e57202..75f1ee9bbb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; @@ -303,40 +305,49 @@ public class TestDataNodeReconfiguration { @Test public void testBlockReportIntervalReconfiguration() - throws ReconfigurationException, IOException { + throws ReconfigurationException { int blockReportInterval = 300 * 1000; + String[] blockReportParameters = { + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_KEY}; + for (int i = 0; i < NUM_DATA_NODE; i++) { DataNode dn = cluster.getDataNodes().get(i); + BlockPoolManager blockPoolManager = dn.getBlockPoolManager(); // Try invalid values. - try { - dn.reconfigureProperty( - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text"); - fail("ReconfigurationException expected"); - } catch (ReconfigurationException expected) { - assertTrue("expecting NumberFormatException", - expected.getCause() instanceof NumberFormatException); + for (String blockReportParameter : blockReportParameters) { + try { + dn.reconfigureProperty(blockReportParameter, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } } + try { - dn.reconfigureProperty( - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - String.valueOf(-1)); + dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1)); fail("ReconfigurationException expected"); } catch (ReconfigurationException expected) { assertTrue("expecting IllegalArgumentException", expected.getCause() instanceof IllegalArgumentException); } + try { + dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1)); + assertEquals(0, dn.getDnConf().initialBlockReportDelayMs); - // Change properties. + // Change properties and verify the change. dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(blockReportInterval)); - - // Verify change. - assertEquals(String.format("%s has wrong value", - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), - blockReportInterval, - dn.getDnConf().getBlockReportInterval()); - for (BPOfferService bpos : dn.getAllBpOs()) { + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { for (BPServiceActor actor : bpos.getBPServiceActors()) { assertEquals(String.format("%s has wrong value", @@ -347,15 +358,15 @@ public class TestDataNodeReconfiguration { } } - // Revert to default. - dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - null); - assertEquals(String.format("%s has wrong value", - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), - DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT, - dn.getDnConf().getBlockReportInterval()); - // Verify default. - for (BPOfferService bpos : dn.getAllBpOs()) { + dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123)); + assertEquals(123, dn.getDnConf().blockReportSplitThreshold); + + dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123"); + assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs); + + // Revert to default and verify default. + dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { for (BPServiceActor actor : bpos.getBPServiceActors()) { assertEquals(String.format("%s has wrong value", @@ -365,9 +376,16 @@ public class TestDataNodeReconfiguration { } } } - assertEquals(String.format("expect %s is not configured", - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn - .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); + assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), + dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); + + dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null); + assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY), + dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)); + + dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null); + assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY), + dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index d22162b871f..0f16fd771f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -330,7 +330,7 @@ public class TestDFSAdmin { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(6, outs.size()); + assertEquals(8, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }