HDFS-14429. Block remain in COMMITTED but not COMPLETE caused by Decommission. Contributed by Yicong Cai.
This commit is contained in:
parent
72900de5f4
commit
8053085388
|
@ -3342,7 +3342,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
int curReplicaDelta;
|
||||
if (result == AddBlockResult.ADDED) {
|
||||
curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
|
||||
curReplicaDelta =
|
||||
(node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1;
|
||||
if (logEveryBlock) {
|
||||
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
|
||||
node, storedBlock, storedBlock.getNumBytes());
|
||||
|
@ -3368,9 +3369,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
int numLiveReplicas = num.liveReplicas();
|
||||
int pendingNum = pendingReconstruction.getNumReplicas(storedBlock);
|
||||
int numCurrentReplica = numLiveReplicas + pendingNum;
|
||||
int numUsableReplicas = num.liveReplicas() +
|
||||
num.decommissioning() + num.liveEnteringMaintenanceReplicas();
|
||||
|
||||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||
hasMinStorage(storedBlock, numLiveReplicas)) {
|
||||
hasMinStorage(storedBlock, numUsableReplicas)) {
|
||||
addExpectedReplicasToPending(storedBlock);
|
||||
completeBlock(storedBlock, null, false);
|
||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
|
@ -963,6 +964,95 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate the following scene:
|
||||
* Client writes Block(bk1) to three data nodes (dn1/dn2/dn3). bk1 has
|
||||
* been completely written to three data nodes, and the data node succeeds
|
||||
* FinalizeBlock, joins IBR and waits to report to NameNode. The client
|
||||
* commits bk1 after receiving the ACK. When the DN has not been reported
|
||||
* to the IBR, all three nodes dn1/dn2/dn3 enter Decommissioning and then the
|
||||
* DN reports the IBR.
|
||||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testAllocAndIBRWhileDecommission() throws IOException {
|
||||
LOG.info("Starting test testAllocAndIBRWhileDecommission");
|
||||
startCluster(1, 6);
|
||||
getCluster().waitActive();
|
||||
FSNamesystem ns = getCluster().getNamesystem(0);
|
||||
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
|
||||
|
||||
Path file = new Path("/testAllocAndIBRWhileDecommission");
|
||||
DistributedFileSystem dfs = getCluster().getFileSystem();
|
||||
FSDataOutputStream out = dfs.create(file, true,
|
||||
getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
|
||||
4096), (short) 3, blockSize);
|
||||
|
||||
// Write first block data to the file, write one more long number will
|
||||
// commit first block and allocate second block.
|
||||
long writtenBytes = 0;
|
||||
while (writtenBytes + 8 < blockSize) {
|
||||
out.writeLong(writtenBytes);
|
||||
writtenBytes += 8;
|
||||
}
|
||||
out.hsync();
|
||||
|
||||
// Get fist block information
|
||||
LocatedBlock firstLocatedBlock = NameNodeAdapter.getBlockLocations(
|
||||
getCluster().getNameNode(), "/testAllocAndIBRWhileDecommission", 0,
|
||||
fileSize).getLastLocatedBlock();
|
||||
DatanodeInfo[] firstBlockLocations = firstLocatedBlock.getLocations();
|
||||
|
||||
// Close first block's datanode IBR.
|
||||
ArrayList<String> toDecom = new ArrayList<>();
|
||||
ArrayList<DatanodeInfo> decomDNInfos = new ArrayList<>();
|
||||
for (DatanodeInfo datanodeInfo : firstBlockLocations) {
|
||||
toDecom.add(datanodeInfo.getXferAddr());
|
||||
decomDNInfos.add(dm.getDatanode(datanodeInfo));
|
||||
DataNode dn = getDataNode(datanodeInfo);
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
DataNodeTestUtils.pauseIBR(dn);
|
||||
}
|
||||
|
||||
// Write more than one block, then commit first block, allocate second
|
||||
// block.
|
||||
while (writtenBytes <= blockSize) {
|
||||
out.writeLong(writtenBytes);
|
||||
writtenBytes += 8;
|
||||
}
|
||||
out.hsync();
|
||||
|
||||
// IBR closed, so the first block UCState is COMMITTED, not COMPLETE.
|
||||
assertEquals(BlockUCState.COMMITTED,
|
||||
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
|
||||
.getBlockUCState());
|
||||
|
||||
// Decommission all nodes of the first block
|
||||
initExcludeHosts(toDecom);
|
||||
refreshNodes(0);
|
||||
|
||||
// Waiting nodes at DECOMMISSION_INPROGRESS state and then resume IBR.
|
||||
for (DatanodeInfo dnDecom : decomDNInfos) {
|
||||
waitNodeState(dnDecom, AdminStates.DECOMMISSION_INPROGRESS);
|
||||
DataNodeTestUtils.resumeIBR(getDataNode(dnDecom));
|
||||
}
|
||||
|
||||
// Recover first block's datanode hertbeat, will report the first block
|
||||
// state to NN.
|
||||
for (DataNode dn : getCluster().getDataNodes()) {
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
}
|
||||
|
||||
// NN receive first block report, transfer block state from COMMITTED to
|
||||
// COMPLETE.
|
||||
assertEquals(BlockUCState.COMPLETE,
|
||||
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
|
||||
.getBlockUCState());
|
||||
|
||||
out.close();
|
||||
|
||||
shutdownCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests restart of namenode while datanode hosts are added to exclude file
|
||||
**/
|
||||
|
|
|
@ -101,6 +101,11 @@ public class DataNodeTestUtils {
|
|||
public static void pauseIBR(DataNode dn) {
|
||||
dn.setIBRDisabledForTest(true);
|
||||
}
|
||||
|
||||
public static void resumeIBR(DataNode dn) {
|
||||
dn.setIBRDisabledForTest(false);
|
||||
}
|
||||
|
||||
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
||||
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
||||
boolean connectToDnViaHostname) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue