HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.

This commit is contained in:
Ayush Saxena 2019-11-05 01:37:15 +05:30
parent 51e7d1b37e
commit 2ffec347eb
2 changed files with 116 additions and 11 deletions

View File

@ -2076,17 +2076,18 @@ public class BlockManager implements BlockStatsMXBean {
numReplicas.decommissioning() -
numReplicas.liveEnteringMaintenanceReplicas();
}
byte[] indices = new byte[liveBlockIndices.size()];
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i);
}
final DatanodeDescriptor[] newSrcNodes =
new DatanodeDescriptor[srcNodes.length];
byte[] newIndices = new byte[liveBlockIndices.size()];
adjustSrcNodesAndIndices((BlockInfoStriped)block,
srcNodes, liveBlockIndices, newSrcNodes, newIndices);
byte[] busyIndices = new byte[liveBusyBlockIndices.size()];
for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
busyIndices[i] = liveBusyBlockIndices.get(i);
}
return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, indices, busyIndices);
priority, newIndices, busyIndices);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
@ -2094,6 +2095,32 @@ public class BlockManager implements BlockStatsMXBean {
}
}
/**
* Adjust srcNodes and indices which are used to reconstruction block.
* We should guarantee the indexes of first minRequiredSources nodes
+ are different.
*/
private void adjustSrcNodesAndIndices(BlockInfoStriped block,
DatanodeDescriptor[] srcNodes, List<Byte> indices,
DatanodeDescriptor[] newSrcNodes, byte[] newIndices) {
BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
List<Integer> skipIndexList = new ArrayList<>();
for (int i = 0, j = 0; i < srcNodes.length; i++) {
if (!bitSet.get(indices.get(i))) {
bitSet.set(indices.get(i));
newSrcNodes[j] = srcNodes[i];
newIndices[j++] = indices.get(i);
} else {
skipIndexList.add(i);
}
}
for(int i = srcNodes.length - skipIndexList.size(), j = 0;
i < srcNodes.length; i++, j++) {
newSrcNodes[i] = srcNodes[skipIndexList.get(j)];
newIndices[i] = indices.get(skipIndexList.get(j));
}
}
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();

View File

@ -500,14 +500,15 @@ public class TestDecommissionWithStriped {
return new DFSClient(nn.getNameNodeAddress(), conf);
}
private void writeStripedFile(DistributedFileSystem dfs, Path ecFile,
int writeBytes) throws IOException, Exception {
private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
int writeBytes) throws Exception {
byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
StripedFileTestUtil.checkData(dfs, ecFile, writeBytes,
StripedFileTestUtil.checkData(fs, ecFile, writeBytes,
new ArrayList<DatanodeInfo>(), null, blockGroupSize);
return bytes;
}
private void writeConfigFile(Path name, List<String> nodes)
@ -958,4 +959,81 @@ public class TestDecommissionWithStriped {
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
cleanupFile(dfs, ecFile);
}
/**
* Test recovery for an ec block, its storage array contains these internal
* blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2,
* b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are
* in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are
* in live.
*/
@Test (timeout = 120000)
public void testRecoveryWithDecommission() throws Exception {
final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission");
int writeBytes = cellSize * dataBlocks;
byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
.getAllBlocks();
LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
DatanodeInfo[] dnList = blk.getLocations();
BlockInfoStriped blockInfo =
(BlockInfoStriped)bm.getStoredBlock(
new Block(blk.getBlock().getBlockId()));
// Decommission datanode dn0 contains block b0
// Aim to add storageinfo of replicated block b0 to storages[9] of ec block
List<DatanodeInfo> decommissionedNodes = new ArrayList<>();
decommissionedNodes.add(dnList[0]);
decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
// Now storages of ec block are (b0{decommissioned}, b[1-8]{live},
// b0{live})
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
assertEquals(1, bm.countNodes(blockInfo).decommissioned());
int decommissionNodesNum = 4;
// Decommission nodes contain blocks of b[0-3]
// dn0 has been decommissioned
for (int i = 1; i < decommissionNodesNum; i++) {
decommissionedNodes.add(dnList[i]);
}
decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
// Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live},
// b0{live}, b[1-3]{live})
// There are 9 live and 4 decommissioned internal blocks
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
assertEquals(4, bm.countNodes(blockInfo).decommissioned());
// There are no reconstruction tasks
assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
.getNumPendingNodes());
assertEquals(0, bm.getUnderReplicatedNotMissingBlocks());
// Set dn0 in decommissioning
// So that the block on dn0 can be used for reconstruction task
DatanodeDescriptor dn0 = bm.getDatanodeManager()
.getDatanode(dnList[0].getDatanodeUuid());
dn0.startDecommission();
// Stop the datanode contains b4
DataNode dn = cluster.getDataNode(
dnList[decommissionNodesNum].getIpcPort());
cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr());
cluster.setDataNodeDead(dn.getDatanodeId());
// Now storages of ec block are (b[0]{decommissioning},
// b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live})
// There are 8 live and 1 decommissioning internal blocks
// Wait for reconstruction EC block.
GenericTestUtils.waitFor(
() -> bm.countNodes(blockInfo).liveReplicas() == 9,
100, 10000);
byte[] readBytesArray = new byte[writeBytes];
StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes,
originBytesArray, readBytesArray, ecPolicy);
cleanupFile(dfs, ecFile);
}
}