HDFS-14429. Block remain in COMMITTED but not COMPLETE caused by Decommission. Contributed by Yicong Cai.
(cherry picked from commit8053085388
) (cherry picked from commitd28e624958
)
This commit is contained in:
parent
4d21310865
commit
608e7044a5
|
@ -3327,7 +3327,8 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
int curReplicaDelta;
|
int curReplicaDelta;
|
||||||
if (result == AddBlockResult.ADDED) {
|
if (result == AddBlockResult.ADDED) {
|
||||||
curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
|
curReplicaDelta =
|
||||||
|
(node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1;
|
||||||
if (logEveryBlock) {
|
if (logEveryBlock) {
|
||||||
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
|
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
|
||||||
node, storedBlock, storedBlock.getNumBytes());
|
node, storedBlock, storedBlock.getNumBytes());
|
||||||
|
@ -3353,9 +3354,11 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
int numLiveReplicas = num.liveReplicas();
|
int numLiveReplicas = num.liveReplicas();
|
||||||
int pendingNum = pendingReconstruction.getNumReplicas(storedBlock);
|
int pendingNum = pendingReconstruction.getNumReplicas(storedBlock);
|
||||||
int numCurrentReplica = numLiveReplicas + pendingNum;
|
int numCurrentReplica = numLiveReplicas + pendingNum;
|
||||||
|
int numUsableReplicas = num.liveReplicas() +
|
||||||
|
num.decommissioning() + num.liveEnteringMaintenanceReplicas();
|
||||||
|
|
||||||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||||
hasMinStorage(storedBlock, numLiveReplicas)) {
|
hasMinStorage(storedBlock, numUsableReplicas)) {
|
||||||
addExpectedReplicasToPending(storedBlock);
|
addExpectedReplicasToPending(storedBlock);
|
||||||
completeBlock(storedBlock, null, false);
|
completeBlock(storedBlock, null, false);
|
||||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
@ -962,6 +963,95 @@ public class TestDecommission extends AdminStatesBaseTest {
|
||||||
BlockManagerTestUtil.recheckDecommissionState(dm);
|
BlockManagerTestUtil.recheckDecommissionState(dm);
|
||||||
assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
|
assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate the following scene:
|
||||||
|
* Client writes Block(bk1) to three data nodes (dn1/dn2/dn3). bk1 has
|
||||||
|
* been completely written to three data nodes, and the data node succeeds
|
||||||
|
* FinalizeBlock, joins IBR and waits to report to NameNode. The client
|
||||||
|
* commits bk1 after receiving the ACK. When the DN has not been reported
|
||||||
|
* to the IBR, all three nodes dn1/dn2/dn3 enter Decommissioning and then the
|
||||||
|
* DN reports the IBR.
|
||||||
|
*/
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testAllocAndIBRWhileDecommission() throws IOException {
|
||||||
|
LOG.info("Starting test testAllocAndIBRWhileDecommission");
|
||||||
|
startCluster(1, 6);
|
||||||
|
getCluster().waitActive();
|
||||||
|
FSNamesystem ns = getCluster().getNamesystem(0);
|
||||||
|
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
|
||||||
|
|
||||||
|
Path file = new Path("/testAllocAndIBRWhileDecommission");
|
||||||
|
DistributedFileSystem dfs = getCluster().getFileSystem();
|
||||||
|
FSDataOutputStream out = dfs.create(file, true,
|
||||||
|
getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
4096), (short) 3, blockSize);
|
||||||
|
|
||||||
|
// Write first block data to the file, write one more long number will
|
||||||
|
// commit first block and allocate second block.
|
||||||
|
long writtenBytes = 0;
|
||||||
|
while (writtenBytes + 8 < blockSize) {
|
||||||
|
out.writeLong(writtenBytes);
|
||||||
|
writtenBytes += 8;
|
||||||
|
}
|
||||||
|
out.hsync();
|
||||||
|
|
||||||
|
// Get fist block information
|
||||||
|
LocatedBlock firstLocatedBlock = NameNodeAdapter.getBlockLocations(
|
||||||
|
getCluster().getNameNode(), "/testAllocAndIBRWhileDecommission", 0,
|
||||||
|
fileSize).getLastLocatedBlock();
|
||||||
|
DatanodeInfo[] firstBlockLocations = firstLocatedBlock.getLocations();
|
||||||
|
|
||||||
|
// Close first block's datanode IBR.
|
||||||
|
ArrayList<String> toDecom = new ArrayList<>();
|
||||||
|
ArrayList<DatanodeInfo> decomDNInfos = new ArrayList<>();
|
||||||
|
for (DatanodeInfo datanodeInfo : firstBlockLocations) {
|
||||||
|
toDecom.add(datanodeInfo.getXferAddr());
|
||||||
|
decomDNInfos.add(dm.getDatanode(datanodeInfo));
|
||||||
|
DataNode dn = getDataNode(datanodeInfo);
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||||
|
DataNodeTestUtils.pauseIBR(dn);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write more than one block, then commit first block, allocate second
|
||||||
|
// block.
|
||||||
|
while (writtenBytes <= blockSize) {
|
||||||
|
out.writeLong(writtenBytes);
|
||||||
|
writtenBytes += 8;
|
||||||
|
}
|
||||||
|
out.hsync();
|
||||||
|
|
||||||
|
// IBR closed, so the first block UCState is COMMITTED, not COMPLETE.
|
||||||
|
assertEquals(BlockUCState.COMMITTED,
|
||||||
|
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
|
||||||
|
.getBlockUCState());
|
||||||
|
|
||||||
|
// Decommission all nodes of the first block
|
||||||
|
initExcludeHosts(toDecom);
|
||||||
|
refreshNodes(0);
|
||||||
|
|
||||||
|
// Waiting nodes at DECOMMISSION_INPROGRESS state and then resume IBR.
|
||||||
|
for (DatanodeInfo dnDecom : decomDNInfos) {
|
||||||
|
waitNodeState(dnDecom, AdminStates.DECOMMISSION_INPROGRESS);
|
||||||
|
DataNodeTestUtils.resumeIBR(getDataNode(dnDecom));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover first block's datanode hertbeat, will report the first block
|
||||||
|
// state to NN.
|
||||||
|
for (DataNode dn : getCluster().getDataNodes()) {
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||||
|
}
|
||||||
|
|
||||||
|
// NN receive first block report, transfer block state from COMMITTED to
|
||||||
|
// COMPLETE.
|
||||||
|
assertEquals(BlockUCState.COMPLETE,
|
||||||
|
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
|
||||||
|
.getBlockUCState());
|
||||||
|
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests restart of namenode while datanode hosts are added to exclude file
|
* Tests restart of namenode while datanode hosts are added to exclude file
|
||||||
|
|
|
@ -101,6 +101,11 @@ public class DataNodeTestUtils {
|
||||||
public static void pauseIBR(DataNode dn) {
|
public static void pauseIBR(DataNode dn) {
|
||||||
dn.setIBRDisabledForTest(true);
|
dn.setIBRDisabledForTest(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void resumeIBR(DataNode dn) {
|
||||||
|
dn.setIBRDisabledForTest(false);
|
||||||
|
}
|
||||||
|
|
||||||
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
||||||
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
||||||
boolean connectToDnViaHostname) throws IOException {
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue