HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.

This commit is contained in:
Surendra Singh Lilhore 2020-02-13 16:30:19 +05:30
parent 9a1ce410f6
commit e13ea1b094
5 changed files with 146 additions and 35 deletions

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@ -1111,15 +1112,15 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] expectedStorages = DatanodeStorageInfo[] expectedStorages =
blk.getUnderConstructionFeature().getExpectedStorageLocations(); blk.getUnderConstructionFeature().getExpectedStorageLocations();
if (expectedStorages.length - blk.numNodes() > 0) { if (expectedStorages.length - blk.numNodes() > 0) {
ArrayList<DatanodeDescriptor> pendingNodes = new ArrayList<>(); ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
for (DatanodeStorageInfo storage : expectedStorages) { for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor(); DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (blk.findStorageInfo(dnd) == null) { if (blk.findStorageInfo(dnd) == null) {
pendingNodes.add(dnd); pendingNodes.add(storage);
} }
} }
pendingReconstruction.increment(blk, pendingReconstruction.increment(blk,
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()])); pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
} }
} }
} }
@ -2169,8 +2170,7 @@ public class BlockManager implements BlockStatsMXBean {
// Move the block-replication into a "pending" state. // Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry // The reason we use 'pending' is so we can retry
// reconstructions that fail after an appropriate amount of time. // reconstructions that fail after an appropriate amount of time.
pendingReconstruction.increment(block, pendingReconstruction.increment(block, targets);
DatanodeStorageInfo.toDatanodeDescriptors(targets));
blockLog.debug("BLOCK* block {} is moved from neededReconstruction to " blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
+ "pendingReconstruction", block); + "pendingReconstruction", block);
@ -4037,7 +4037,7 @@ public class BlockManager implements BlockStatsMXBean {
BlockInfo storedBlock = getStoredBlock(block); BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null && if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) { block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
if (pendingReconstruction.decrement(storedBlock, node)) { if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
NameNode.getNameNodeMetrics().incSuccessfulReReplications(); NameNode.getNameNodeMetrics().incSuccessfulReReplications();
} }
} }
@ -4452,7 +4452,11 @@ public class BlockManager implements BlockStatsMXBean {
addToInvalidates(block); addToInvalidates(block);
removeBlockFromMap(block); removeBlockFromMap(block);
// Remove the block from pendingReconstruction and neededReconstruction // Remove the block from pendingReconstruction and neededReconstruction
pendingReconstruction.remove(block); PendingBlockInfo remove = pendingReconstruction.remove(block);
if (remove != null) {
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block); postponedMisreplicatedBlocks.remove(block);
} }

View File

@ -1688,9 +1688,25 @@ public class DatanodeManager {
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks); numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) { if (pendingList != null && !pendingList.isEmpty()) {
// If the block is deleted, the block size will become
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
// need
// to send for replication or reconstruction
Iterator<BlockTargetPair> iterator = pendingList.iterator();
while (iterator.hasNext()) {
BlockTargetPair cmd = iterator.next();
if (cmd.block != null
&& cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
// block deleted
DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
iterator.remove();
}
}
if (!pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList)); pendingList));
} }
}
// check pending erasure coding tasks // check pending erasure coding tasks
List<BlockECReconstructionInfo> pendingECList = nodeinfo List<BlockECReconstructionInfo> pendingECList = nodeinfo
.getErasureCodeCommand(numECTasks); .getErasureCodeCommand(numECTasks);

View File

@ -81,7 +81,7 @@ class PendingReconstructionBlocks {
* @param block The corresponding block * @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed * @param targets The DataNodes where replicas of the block should be placed
*/ */
void increment(BlockInfo block, DatanodeDescriptor... targets) { void increment(BlockInfo block, DatanodeStorageInfo... targets) {
synchronized (pendingReconstructions) { synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block); PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) { if (found == null) {
@ -101,7 +101,7 @@ class PendingReconstructionBlocks {
* @param dn The DataNode that finishes the reconstruction * @param dn The DataNode that finishes the reconstruction
* @return true if the block is decremented to 0 and got removed. * @return true if the block is decremented to 0 and got removed.
*/ */
boolean decrement(BlockInfo block, DatanodeDescriptor dn) { boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
boolean removed = false; boolean removed = false;
synchronized (pendingReconstructions) { synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block); PendingBlockInfo found = pendingReconstructions.get(block);
@ -124,9 +124,9 @@ class PendingReconstructionBlocks {
* The given block whose pending reconstruction requests need to be * The given block whose pending reconstruction requests need to be
* removed * removed
*/ */
void remove(BlockInfo block) { PendingBlockInfo remove(BlockInfo block) {
synchronized (pendingReconstructions) { synchronized (pendingReconstructions) {
pendingReconstructions.remove(block); return pendingReconstructions.remove(block);
} }
} }
@ -200,11 +200,11 @@ class PendingReconstructionBlocks {
*/ */
static class PendingBlockInfo { static class PendingBlockInfo {
private long timeStamp; private long timeStamp;
private final List<DatanodeDescriptor> targets; private final List<DatanodeStorageInfo> targets;
PendingBlockInfo(DatanodeDescriptor[] targets) { PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = monotonicNow(); this.timeStamp = monotonicNow();
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>() this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
: new ArrayList<>(Arrays.asList(targets)); : new ArrayList<>(Arrays.asList(targets));
} }
@ -216,9 +216,9 @@ class PendingReconstructionBlocks {
timeStamp = monotonicNow(); timeStamp = monotonicNow();
} }
void incrementReplicas(DatanodeDescriptor... newTargets) { void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) { if (newTargets != null) {
for (DatanodeDescriptor newTarget : newTargets) { for (DatanodeStorageInfo newTarget : newTargets) {
if (!targets.contains(newTarget)) { if (!targets.contains(newTarget)) {
targets.add(newTarget); targets.add(newTarget);
} }
@ -226,13 +226,23 @@ class PendingReconstructionBlocks {
} }
} }
void decrementReplicas(DatanodeDescriptor dn) { void decrementReplicas(DatanodeStorageInfo dn) {
targets.remove(dn); Iterator<DatanodeStorageInfo> iterator = targets.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo next = iterator.next();
if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) {
iterator.remove();
}
}
} }
int getNumReplicas() { int getNumReplicas() {
return targets.size(); return targets.size();
} }
List<DatanodeStorageInfo> getTargets() {
return targets;
}
} }
/* /*
@ -318,4 +328,14 @@ class PendingReconstructionBlocks {
} }
} }
} }
List<DatanodeStorageInfo> getTargets(BlockInfo block) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
return found.targets;
}
}
return null;
}
} }

View File

@ -22,11 +22,19 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -129,4 +137,69 @@ public class TestBlocksScheduledCounter {
0, descriptor.getBlocksScheduled()); 0, descriptor.getBlocksScheduled());
} }
} }
/**
* Test if Block Scheduled counter decrement if scheduled blocks file is.
* deleted
* @throws Exception
*/
@Test
public void testScheduledBlocksCounterDecrementOnDeletedBlock()
throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
cluster.waitActive();
BlockManager bm = cluster.getNamesystem().getBlockManager();
try {
DistributedFileSystem dfs = cluster.getFileSystem();
// 1. create a file
Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(dfs, filePath, 1024, (short) 3, 0L);
// 2. disable the heartbeats
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
DatanodeManager datanodeManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
ArrayList<DatanodeDescriptor> dnList =
new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(dnList, dnList, false);
// 3. mark a couple of blocks as corrupt
LocatedBlock block = NameNodeAdapter
.getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1)
.get(0);
DatanodeInfo[] locs = block.getLocations();
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[0], "STORAGE_ID",
"TEST");
bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[1], "STORAGE_ID",
"TEST");
BlockManagerTestUtil.computeAllPendingWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1L, bm.getPendingReconstructionBlocksCount());
} finally {
cluster.getNamesystem().writeUnlock();
}
// 4. delete the file
dfs.delete(filePath, true);
int blocksScheduled = 0;
for (DatanodeDescriptor descriptor : dnList) {
if (descriptor.getBlocksScheduled() != 0) {
blocksScheduled += descriptor.getBlocksScheduled();
}
}
assertEquals(0, blocksScheduled);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -85,8 +85,7 @@ public class TestPendingReconstruction {
BlockInfo block = genBlockInfo(i, i, 0); BlockInfo block = genBlockInfo(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i); System.arraycopy(storages, 0, targets, 0, i);
pendingReconstructions.increment(block, pendingReconstructions.increment(block, targets);
DatanodeStorageInfo.toDatanodeDescriptors(targets));
} }
assertEquals("Size of pendingReconstruction ", assertEquals("Size of pendingReconstruction ",
10, pendingReconstructions.size()); 10, pendingReconstructions.size());
@ -96,25 +95,24 @@ public class TestPendingReconstruction {
// remove one item // remove one item
// //
BlockInfo blk = genBlockInfo(8, 8, 0); BlockInfo blk = genBlockInfo(8, 8, 0);
pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica pendingReconstructions.decrement(blk, storages[7]); // removes one replica
assertEquals("pendingReconstructions.getNumReplicas ", assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk)); 7, pendingReconstructions.getNumReplicas(blk));
// //
// insert the same item twice should be counted as once // insert the same item twice should be counted as once
// //
pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor()); pendingReconstructions.increment(blk, storages[0]);
assertEquals("pendingReconstructions.getNumReplicas ", assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk)); 7, pendingReconstructions.getNumReplicas(blk));
for (int i = 0; i < 7; i++) { for (int i = 0; i < 7; i++) {
// removes all replicas // removes all replicas
pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor()); pendingReconstructions.decrement(blk, storages[i]);
} }
assertTrue(pendingReconstructions.size() == 9); assertTrue(pendingReconstructions.size() == 9);
pendingReconstructions.increment(blk, pendingReconstructions.increment(blk,
DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(8));
DFSTestUtil.createDatanodeStorageInfos(8)));
assertTrue(pendingReconstructions.size() == 10); assertTrue(pendingReconstructions.size() == 10);
// //
@ -144,8 +142,7 @@ public class TestPendingReconstruction {
for (int i = 10; i < 15; i++) { for (int i = 10; i < 15; i++) {
BlockInfo block = genBlockInfo(i, i, 0); BlockInfo block = genBlockInfo(i, i, 0);
pendingReconstructions.increment(block, pendingReconstructions.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(i));
DFSTestUtil.createDatanodeStorageInfos(i)));
} }
assertEquals(15, pendingReconstructions.size()); assertEquals(15, pendingReconstructions.size());
assertEquals(0L, pendingReconstructions.getNumTimedOuts()); assertEquals(0L, pendingReconstructions.getNumTimedOuts());
@ -213,8 +210,7 @@ public class TestPendingReconstruction {
blockInfo = new BlockInfoContiguous(block, (short) 3); blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo, pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(1));
DFSTestUtil.createDatanodeStorageInfos(1)));
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
// Place into blocksmap with GenerationStamp = 1 // Place into blocksmap with GenerationStamp = 1
blockInfo.setGenerationStamp(1); blockInfo.setGenerationStamp(1);
@ -230,8 +226,7 @@ public class TestPendingReconstruction {
block = new Block(2, 2, 0); block = new Block(2, 2, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3); blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo, pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(1));
DFSTestUtil.createDatanodeStorageInfos(1)));
// verify 2 blocks in pendingReconstructions // verify 2 blocks in pendingReconstructions
assertEquals("Size of pendingReconstructions ", 2, assertEquals("Size of pendingReconstructions ", 2,
@ -277,7 +272,8 @@ public class TestPendingReconstruction {
getDatanodes().iterator().next() }; getDatanodes().iterator().next() };
// Add a stored block to the pendingReconstruction. // Add a stored block to the pendingReconstruction.
pendingReconstruction.increment(storedBlock, desc); pendingReconstruction.increment(blockInfo,
DFSTestUtil.createDatanodeStorageInfos(1));
assertEquals("Size of pendingReconstructions ", 1, assertEquals("Size of pendingReconstructions ", 1,
pendingReconstruction.size()); pendingReconstruction.size());
@ -306,6 +302,8 @@ public class TestPendingReconstruction {
fsn.writeUnlock(); fsn.writeUnlock();
} }
GenericTestUtils.waitFor(() -> pendingReconstruction.size() == 0, 500,
10000);
// The pending queue should be empty. // The pending queue should be empty.
assertEquals("Size of pendingReconstructions ", 0, assertEquals("Size of pendingReconstructions ", 0,
pendingReconstruction.size()); pendingReconstruction.size());