HDFS-16423. Balancer should not get blocks on stale storages (#3883) (#3924)

Reviewed-by: litao <tomleescut@gmail.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit db2c3200e6)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

Co-authored-by: qinyuren <1476659627@qq.com>
This commit is contained in:
Wei-Chiu Chuang 2022-01-26 11:54:13 +08:00 committed by GitHub
parent bd13d73334
commit ff3a88b9c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 128 additions and 4 deletions

View File

@ -1642,9 +1642,16 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
// skip stale storage
DatanodeStorageInfo[] storageInfos = Arrays
.stream(node.getStorageInfos())
.filter(s -> !s.areBlockContentsStale())
.toArray(DatanodeStorageInfo[]::new);
// starting from a random block
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock, storageInfos);
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
BlockInfo curBlock;
@ -1657,8 +1664,8 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
iter = node.getBlockIterator(0, storageInfos); // start from the beginning
for(int i = 0; i < startBlock && totalSize < size && iter.hasNext(); i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < minBlockSize) {

View File

@ -636,6 +636,17 @@ Iterator<BlockInfo> getBlockIterator(final int startBlock) {
return new BlockIterator(startBlock, getStorageInfos());
}
/**
* Get iterator, which starts iterating from the specified block and storages.
*
* @param startBlock on which blocks are start iterating
* @param storageInfos specified storages
*/
Iterator<BlockInfo> getBlockIterator(
final int startBlock, final DatanodeStorageInfo[] storageInfos) {
return new BlockIterator(startBlock, storageInfos);
}
@VisibleForTesting
public void incrementPendingReplicationWithoutTargets() {
pendingReplicationWithoutTargets++;

View File

@ -168,6 +168,11 @@ public boolean areBlockContentsStale() {
return blockContentsStale;
}
@VisibleForTesting
public void setBlockContentsStale(boolean value) {
blockContentsStale = value;
}
void markStaleAfterFailover() {
heartbeatedSinceFailover = false;
blockContentsStale = true;

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@ -396,4 +397,67 @@ public void testBlockKey() {
}
}
@Test
public void testReadSkipStaleStorage() throws Exception {
final short repFactor = (short) 1;
final int blockNum = 64;
final int storageNum = 2;
final int fileLen = BLOCK_SIZE * blockNum;
final Path path = new Path("testReadSkipStaleStorage");
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storagesPerDatanode(storageNum)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, path, false, 1024, fileLen,
BLOCK_SIZE, repFactor, 0, true);
// get datanode info
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL);
// get storage info
BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
.getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// check blocks count equals to blockNum
BlockWithLocations[] blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0).getBlocks();
assertEquals(blockNum, blocks.length);
// calculate the block count on storage[0]
int count = 0;
for (BlockWithLocations b : blocks) {
for (String s : b.getStorageIDs()) {
if (s.equals(storageInfos[0].getStorageID())) {
count++;
}
}
}
// set storage[0] stale
storageInfos[0].setBlockContentsStale(true);
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0).getBlocks();
assertEquals(blockNum - count, blocks.length);
// set all storage stale
bm0.getDatanodeManager().markAllDatanodesStale();
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0).getBlocks();
assertEquals(0, blocks.length);
}
}

View File

@ -123,6 +123,7 @@ public void testBalancerServiceBalanceTwice() throws Exception {
TestBalancer.initConf(conf);
try {
setupCluster(conf);
TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
long totalCapacity = addOneDataNode(conf); // make cluster imbalanced
Thread balancerThread =
@ -174,6 +175,7 @@ public void testBalancerServiceOnError() throws Exception {
cluster.transitionToActive(0);
cluster.waitActive();
TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
long totalCapacity = addOneDataNode(conf);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);

View File

@ -44,11 +44,16 @@
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@ -71,6 +76,26 @@ public class TestBalancerWithHANameNodes {
TestBalancer.initTestSetup();
}
public static void waitStoragesNoStale(MiniDFSCluster cluster,
ClientProtocol client, int nnIndex) throws Exception {
// trigger a full block report and wait all storages out of stale
cluster.triggerBlockReports();
DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
GenericTestUtils.waitFor(() -> {
BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager();
for (DatanodeInfo dn : dataNodes) {
DatanodeStorageInfo[] storageInfos = bm.getDatanodeManager()
.getDatanode(dn.getDatanodeUuid()).getStorageInfos();
for (DatanodeStorageInfo s : storageInfos) {
if (s.areBlockContentsStale()) {
return false;
}
}
}
return true;
}, 300, 60000);
}
/**
* Test a cluster with even distribution, then a new empty node is added to
* the cluster. Test start a cluster with specified number of nodes, and fills
@ -99,13 +124,17 @@ public void testBalancerWithHANameNodes() throws Exception {
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
ClientProtocol.class).getProxy();
doTest(conf);
doTest(conf, true);
} finally {
cluster.shutdown();
}
}
void doTest(Configuration conf) throws Exception {
doTest(conf, false);
}
void doTest(Configuration conf, boolean withHA) throws Exception {
int numOfDatanodes = TEST_CAPACITIES.length;
long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
// fill up the cluster to be 30% full
@ -119,6 +148,12 @@ void doTest(Configuration conf) throws Exception {
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
}
// all storages are stale after HA
if (withHA) {
waitStoragesNoStale(cluster, client, 0);
}
// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack