diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index bc2141d986a..2947b727305 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -383,7 +383,7 @@ public class BlockManager implements BlockStatsMXBean { final int maxCorruptFilesReturned; final float blocksInvalidateWorkPct; - final int blocksReplWorkMultiplier; + private int blocksReplWorkMultiplier; // whether or not to issue block encryption keys. final boolean encryptDataTransfer; @@ -897,11 +897,78 @@ public class BlockManager implements BlockStatsMXBean { out.println(""); } - /** @return maxReplicationStreams */ + /** Returns the current setting for maxReplicationStreams, which is set by + * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY}. + * + * @return maxReplicationStreams + */ public int getMaxReplicationStreams() { return maxReplicationStreams; } + static private void ensurePositiveInt(int val, String key) { + Preconditions.checkArgument( + (val > 0), + key + " = '" + val + "' is invalid. " + + "It should be a positive, non-zero integer value."); + } + + /** + * Updates the value used for maxReplicationStreams, which is set by + * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} initially. + * + * @param newVal - Must be a positive non-zero integer. + */ + public void setMaxReplicationStreams(int newVal) { + ensurePositiveInt(newVal, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY); + maxReplicationStreams = newVal; + } + + /** Returns the current setting for maxReplicationStreamsHardLimit, set by + * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}. + * + * @return maxReplicationStreamsHardLimit + */ + public int getReplicationStreamsHardLimit() { + return replicationStreamsHardLimit; + } + + /** + * Updates the value used for replicationStreamsHardLimit, which is set by + * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY} + * initially. + * + * @param newVal - Must be a positive non-zero integer. + */ + public void setReplicationStreamsHardLimit(int newVal) { + ensurePositiveInt(newVal, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY); + replicationStreamsHardLimit = newVal; + } + + /** Returns the current setting for blocksReplWorkMultiplier, set by + * {@code DFSConfigKeys. + * DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION}. + * + * @return maxReplicationStreamsHardLimit + */ + public int getBlocksReplWorkMultiplier() { + return blocksReplWorkMultiplier; + } + + /** + * Updates the value used for blocksReplWorkMultiplier, set by + * {@code DFSConfigKeys. + * DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION} initially. + * @param newVal - Must be a positive non-zero integer. + */ + public void setBlocksReplWorkMultiplier(int newVal) { + ensurePositiveInt(newVal, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION); + blocksReplWorkMultiplier = newVal; + } + public int getDefaultStorageNum(BlockInfo block) { switch (block.getBlockType()) { case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 1b4f7704d37..e4c88563732 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; @@ -165,6 +166,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT; + import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ToolRunner.confirmPrompt; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE; @@ -299,7 +307,10 @@ public class NameNode extends ReconfigurableBase implements DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, FS_PROTECTED_DIRECTORIES, HADOOP_CALLER_CONTEXT_ENABLED_KEY, - DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)); + DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2125,12 +2136,63 @@ public class NameNode extends ReconfigurableBase implements return reconfigureIPCBackoffEnabled(newVal); } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) { return reconfigureSPSModeEvent(newVal, property); + } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY) + || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY) + || property.equals( + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) { + return reconfReplicationParameters(newVal, property); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); } } + private String reconfReplicationParameters(final String newVal, + final String property) throws ReconfigurationException { + BlockManager bm = namesystem.getBlockManager(); + int newSetting; + namesystem.writeLock(); + try { + if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) { + bm.setMaxReplicationStreams( + adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal)); + newSetting = bm.getMaxReplicationStreams(); + } else if (property.equals( + DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) { + bm.setReplicationStreamsHardLimit( + adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT, + newVal)); + newSetting = bm.getReplicationStreamsHardLimit(); + } else if ( + property.equals( + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) { + bm.setBlocksReplWorkMultiplier( + adjustNewVal( + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT, + newVal)); + newSetting = bm.getBlocksReplWorkMultiplier(); + } else { + throw new IllegalArgumentException("Unexpected property " + + property + "in reconfReplicationParameters"); + } + LOG.info("RECONFIGURE* changed {} to {}", property, newSetting); + return String.valueOf(newSetting); + } catch (IllegalArgumentException e) { + throw new ReconfigurationException(property, newVal, getConf().get( + property), e); + } finally { + namesystem.writeUnlock(); + } + } + + private int adjustNewVal(int defaultVal, String newVal) { + if (newVal == null) { + return defaultVal; + } else { + return Integer.parseInt(newVal); + } + } + private String reconfHeartbeatInterval(final DatanodeManager datanodeManager, final String property, final String newVal) throws ReconfigurationException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java new file mode 100644 index 00000000000..8dc81f8c1a2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the replication related parameters in the namenode can + * be refreshed dynamically, without a namenode restart. + */ +public class TestRefreshNamenodeReplicationConfig { + private MiniDFSCluster cluster = null; + private BlockManager bm; + + @Before + public void setup() throws IOException { + Configuration config = new Configuration(); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 8); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 10); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + 12); + + cluster = new MiniDFSCluster.Builder(config) + .nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0)) + .numDataNodes(0).build(); + cluster.waitActive(); + bm = cluster.getNameNode().getNamesystem().getBlockManager(); + } + + @After + public void teardown() throws IOException { + cluster.shutdown(); + } + + /** + * Tests to ensure each of the block replication parameters can be passed + * updated successfully. + */ + @Test(timeout = 90000) + public void testParamsCanBeReconfigured() throws ReconfigurationException { + + assertEquals(8, bm.getMaxReplicationStreams()); + assertEquals(10, bm.getReplicationStreamsHardLimit()); + assertEquals(12, bm.getBlocksReplWorkMultiplier()); + + cluster.getNameNode().reconfigurePropertyImpl( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20"); + cluster.getNameNode().reconfigurePropertyImpl( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + "22"); + cluster.getNameNode().reconfigurePropertyImpl( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + "24"); + + assertEquals(20, bm.getMaxReplicationStreams()); + assertEquals(22, bm.getReplicationStreamsHardLimit()); + assertEquals(24, bm.getBlocksReplWorkMultiplier()); + } + + /** + * Tests to ensure reconfiguration fails with a negative, zero or string value + * value for each parameter. + */ + @Test(timeout = 90000) + public void testReconfigureFailsWithInvalidValues() throws Exception { + String[] keys = new String[]{ + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION + }; + + // Ensure we cannot set any of the parameters negative + for (String key : keys) { + ReconfigurationException e = + LambdaTestUtils.intercept(ReconfigurationException.class, + () -> cluster.getNameNode().reconfigurePropertyImpl(key, "-20")); + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertEquals(key+" = '-20' is invalid. It should be a " + +"positive, non-zero integer value.", e.getCause().getMessage()); + } + // Ensure none of the values were updated from the defaults + assertEquals(8, bm.getMaxReplicationStreams()); + assertEquals(10, bm.getReplicationStreamsHardLimit()); + assertEquals(12, bm.getBlocksReplWorkMultiplier()); + + for (String key : keys) { + ReconfigurationException e = + LambdaTestUtils.intercept(ReconfigurationException.class, + () -> cluster.getNameNode().reconfigurePropertyImpl(key, "0")); + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertEquals(key+" = '0' is invalid. It should be a " + +"positive, non-zero integer value.", e.getCause().getMessage()); + } + + // Ensure none of the values were updated from the defaults + assertEquals(8, bm.getMaxReplicationStreams()); + assertEquals(10, bm.getReplicationStreamsHardLimit()); + assertEquals(12, bm.getBlocksReplWorkMultiplier()); + + // Ensure none of the parameters can be set to a string value + for (String key : keys) { + ReconfigurationException e = + LambdaTestUtils.intercept(ReconfigurationException.class, + () -> cluster.getNameNode().reconfigurePropertyImpl(key, "str")); + assertTrue(e.getCause() instanceof NumberFormatException); + } + + // Ensure none of the values were updated from the defaults + assertEquals(8, bm.getMaxReplicationStreams()); + assertEquals(10, bm.getReplicationStreamsHardLimit()); + assertEquals(12, bm.getBlocksReplWorkMultiplier()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 063217bf925..90d0761a8e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -394,7 +394,7 @@ public class TestDFSAdmin { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(7, outs.size()); + assertEquals(10, outs.size()); assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1)); assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2)); assertEquals(errs.size(), 0);