From ff3a88b9c27d49fddd005f2c263811dd7462b60a Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 26 Jan 2022 11:54:13 +0800 Subject: [PATCH] HDFS-16423. Balancer should not get blocks on stale storages (#3883) (#3924) Reviewed-by: litao Signed-off-by: Takanobu Asanuma (cherry picked from commit db2c3200e63a4377331643eddedcab79d4a51468) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Co-authored-by: qinyuren <1476659627@qq.com> --- .../server/blockmanagement/BlockManager.java | 13 +++- .../blockmanagement/DatanodeDescriptor.java | 11 ++++ .../blockmanagement/DatanodeStorageInfo.java | 5 ++ .../org/apache/hadoop/hdfs/TestGetBlocks.java | 64 +++++++++++++++++++ .../server/balancer/TestBalancerService.java | 2 + .../balancer/TestBalancerWithHANameNodes.java | 37 ++++++++++- 6 files changed, 128 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 48ac5ed6ead..ea95eca8bcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1642,9 +1642,16 @@ public class BlockManager implements BlockStatsMXBean { if(numBlocks == 0) { return new BlocksWithLocations(new BlockWithLocations[0]); } + + // skip stale storage + DatanodeStorageInfo[] storageInfos = Arrays + .stream(node.getStorageInfos()) + .filter(s -> !s.areBlockContentsStale()) + .toArray(DatanodeStorageInfo[]::new); + // starting from a random block int startBlock = ThreadLocalRandom.current().nextInt(numBlocks); - Iterator iter = node.getBlockIterator(startBlock); + Iterator iter = node.getBlockIterator(startBlock, storageInfos); List results = new ArrayList(); long totalSize = 0; BlockInfo curBlock; @@ -1657,8 +1664,8 @@ public class BlockManager implements BlockStatsMXBean { totalSize += addBlock(curBlock, results); } if(totalSize getBlockIterator( + final int startBlock, final DatanodeStorageInfo[] storageInfos) { + return new BlockIterator(startBlock, storageInfos); + } + @VisibleForTesting public void incrementPendingReplicationWithoutTargets() { pendingReplicationWithoutTargets++; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index e8fcef2018c..e3a19fb4191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -168,6 +168,11 @@ public class DatanodeStorageInfo { return blockContentsStale; } + @VisibleForTesting + public void setBlockContentsStale(boolean value) { + blockContentsStale = value; + } + void markStaleAfterFailover() { heartbeatedSinceFailover = false; blockContentsStale = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java index e82b990a4e8..5f087557c6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; @@ -396,4 +397,67 @@ public class TestGetBlocks { } } + @Test + public void testReadSkipStaleStorage() throws Exception { + final short repFactor = (short) 1; + final int blockNum = 64; + final int storageNum = 2; + final int fileLen = BLOCK_SIZE * blockNum; + final Path path = new Path("testReadSkipStaleStorage"); + final Configuration conf = new HdfsConfiguration(); + + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storagesPerDatanode(storageNum) + .build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, path, false, 1024, fileLen, + BLOCK_SIZE, repFactor, 0, true); + + // get datanode info + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL); + + // get storage info + BlockManager bm0 = cluster.getNamesystem(0).getBlockManager(); + DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager() + .getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos(); + + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + NamenodeProtocol namenode = NameNodeProxies.createProxy(conf, + DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); + + // check blocks count equals to blockNum + BlockWithLocations[] blocks = namenode.getBlocks( + dataNodes[0], fileLen*2, 0).getBlocks(); + assertEquals(blockNum, blocks.length); + + // calculate the block count on storage[0] + int count = 0; + for (BlockWithLocations b : blocks) { + for (String s : b.getStorageIDs()) { + if (s.equals(storageInfos[0].getStorageID())) { + count++; + } + } + } + + // set storage[0] stale + storageInfos[0].setBlockContentsStale(true); + blocks = namenode.getBlocks( + dataNodes[0], fileLen*2, 0).getBlocks(); + assertEquals(blockNum - count, blocks.length); + + // set all storage stale + bm0.getDatanodeManager().markAllDatanodesStale(); + blocks = namenode.getBlocks( + dataNodes[0], fileLen*2, 0).getBlocks(); + assertEquals(0, blocks.length); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java index f1fab273bdb..012dde5f36e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java @@ -123,6 +123,7 @@ public class TestBalancerService { TestBalancer.initConf(conf); try { setupCluster(conf); + TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0); long totalCapacity = addOneDataNode(conf); // make cluster imbalanced Thread balancerThread = @@ -174,6 +175,7 @@ public class TestBalancerService { cluster.transitionToActive(0); cluster.waitActive(); + TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0); long totalCapacity = addOneDataNode(conf); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, BalancerParameters.DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 185df1246d3..8241f25e4af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -44,11 +44,16 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -71,6 +76,26 @@ public class TestBalancerWithHANameNodes { TestBalancer.initTestSetup(); } + public static void waitStoragesNoStale(MiniDFSCluster cluster, + ClientProtocol client, int nnIndex) throws Exception { + // trigger a full block report and wait all storages out of stale + cluster.triggerBlockReports(); + DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); + GenericTestUtils.waitFor(() -> { + BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager(); + for (DatanodeInfo dn : dataNodes) { + DatanodeStorageInfo[] storageInfos = bm.getDatanodeManager() + .getDatanode(dn.getDatanodeUuid()).getStorageInfos(); + for (DatanodeStorageInfo s : storageInfos) { + if (s.areBlockContentsStale()) { + return false; + } + } + } + return true; + }, 300, 60000); + } + /** * Test a cluster with even distribution, then a new empty node is added to * the cluster. Test start a cluster with specified number of nodes, and fills @@ -99,13 +124,17 @@ public class TestBalancerWithHANameNodes { client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class).getProxy(); - doTest(conf); + doTest(conf, true); } finally { cluster.shutdown(); } } void doTest(Configuration conf) throws Exception { + doTest(conf, false); + } + + void doTest(Configuration conf, boolean withHA) throws Exception { int numOfDatanodes = TEST_CAPACITIES.length; long totalCapacity = TestBalancer.sum(TEST_CAPACITIES); // fill up the cluster to be 30% full @@ -119,6 +148,12 @@ public class TestBalancerWithHANameNodes { HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), cluster.getNameNode(1)); } + + // all storages are stale after HA + if (withHA) { + waitStoragesNoStale(cluster, client, 0); + } + // start up an empty node with the same capacity and on the same rack long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack