HDFS-17003. Erasure Coding: invalidate wrong block after reporting bad blocks from datanode (#5643). Contributed by hfutatzhanghb.

Reviewed-by: Stephen O'Donnell <sodonnel@apache.org>
Reviewed-by: zhangshuyan <zqingchai@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
hfutatzhanghb 2023-06-08 18:06:51 +08:00 committed by GitHub
parent ddae78b0ec
commit 0e6bd09ae3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 161 additions and 5 deletions

View File

@ -3751,9 +3751,24 @@ public class BlockManager implements BlockStatsMXBean {
// ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy =
nodes.toArray(new DatanodeDescriptor[nodes.size()]);
DatanodeStorageInfo[] storages = null;
if (blk.isStriped()) {
storages = getStorages(blk);
}
for (DatanodeDescriptor node : nodesCopy) {
Block blockToInvalidate = reported;
if (storages != null && blk.isStriped()) {
for (DatanodeStorageInfo s : storages) {
if (s.getDatanodeDescriptor().equals(node)) {
blockToInvalidate = getBlockOnStorage(blk, s);
break;
}
}
}
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
if (!invalidateBlock(new BlockToMarkCorrupt(blockToInvalidate, blk, null,
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}

View File

@ -73,11 +73,11 @@ abstract public class ReadStripedFileWithDecodingHelper {
public static MiniDFSCluster initializeCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
0);
2);
MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.numDataNodes(NUM_DATANODES + 3)
.build();
myCluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -108,6 +108,22 @@ abstract public class ReadStripedFileWithDecodingHelper {
return -1;
}
// The index begins from 1.
public static int findDataNodeAtIndex(MiniDFSCluster cluster,
DistributedFileSystem dfs, Path file, long length, int index) throws IOException {
BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[index - 1];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}
/**
* Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS.
* Input for parameterized tests classes.

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -50,6 +51,7 @@ import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZ
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findDataNodeAtIndex;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
@ -96,7 +98,7 @@ public class TestReadStripedFileWithDecoding {
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
// find the first block file
// Find the first block file.
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
@ -169,6 +171,129 @@ public class TestReadStripedFileWithDecoding {
}
}
/**
* This unit test try to cover the below situation:
* Suppose we have an EC file with RS(d,p) policy and block group id
* is blk_-9223372036845119810_1920002.
* If the first and second data block in this ec block group are corrupted,
* meanwhile we read this EC file.
* It will trigger reportBadBlock RPC and
* add the blk_-9223372036845119810_1920002
* and blk_-9223372036845119809_1920002 blocks to corruptReplicas.
* It will also reconstruct the two blocks and send IBR to namenode,
* then execute BlockManager#addStoredBlock and
* invalidateCorruptReplicas method. Suppose we first receive the IBR of
* blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method,
* it will only invalidate 9223372036845119809_1920002 on the two datanodes contains
* the two corrupt blocks.
*
* @throws Exception
*/
@Test
public void testCorruptionECBlockInvalidate() throws Exception {
final Path file = new Path("/invalidate_corrupted");
final int length = BLOCK_SIZE * NUM_DATA_UNITS;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(dfs, file, bytes);
int dnIndex = findFirstDataNode(cluster, dfs, file,
CELL_SIZE * NUM_DATA_UNITS);
int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
CELL_SIZE * NUM_DATA_UNITS, 2);
Assert.assertNotEquals(-1, dnIndex);
Assert.assertNotEquals(-1, dnIndex2);
LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
.getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
final Block b = blks[0].getBlock().getLocalBlock();
final Block b2 = blks[1].getBlock().getLocalBlock();
// Find the first block file.
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// Corrupt the block file.
LOG.info("Deliberately corrupting file " + blkFile.getName());
try (FileOutputStream out = new FileOutputStream(blkFile)) {
out.write("corruption".getBytes());
out.flush();
}
// Find the second block file.
File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
Assert.assertTrue("Block file does not exist", blkFile2.exists());
// Corrupt the second block file.
LOG.info("Deliberately corrupting file " + blkFile2.getName());
try (FileOutputStream out = new FileOutputStream(blkFile2)) {
out.write("corruption".getBytes());
out.flush();
}
// Disable the heartbeat from DN so that the corrupted block record is kept
// in NameNode.
for (DataNode dataNode : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
}
try {
// Do stateful read.
StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
ByteBuffer.allocate(1024));
// Check whether the corruption has been reported to the NameNode.
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
GenericTestUtils.waitFor(() -> {
if (bm.getCorruptReplicas(blockInfo) == null) {
return false;
}
return bm.getCorruptReplicas(blockInfo).size() == 2;
}, 250, 60000);
// Double check.
Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());
DatanodeDescriptor dnd2 =
NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());
for (DataNode datanode : cluster.getDataNodes()) {
if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) &&
!datanode.getDatanodeUuid().equals(dnd2.getDatanodeUuid())) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
}
}
GenericTestUtils.waitFor(() -> {
return bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b);
}, 250, 60000);
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
GenericTestUtils.waitFor(() -> {
return bm.containsInvalidateBlock(
blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2);
}, 250, 60000);
Assert.assertTrue(bm.containsInvalidateBlock(
blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2));
} finally {
for (DataNode datanode : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
}
}
}
@Test
public void testMoreThanOneCorruptedBlock() throws IOException {
final Path file = new Path("/corrupted");