HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport latency. Contributed by Ming Ma.

This commit is contained in:
Kihwal Lee 2014-12-16 10:30:22 -06:00
parent 07bb0b0bbb
commit b7923a356e
8 changed files with 124 additions and 49 deletions

View File

@ -595,6 +595,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7516. Fix findbugs warnings in hdfs-nfs project. (brandonli)
HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport
latency. (Ming Ma via kihwal)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -325,6 +325,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
// Number of blocks to rescan for each iteration of postponedMisreplicatedBlocks.
public static final String DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY = "dfs.namenode.blocks.per.postponedblocks.rescan";
public static final long DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT = 10000;
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
"dfs.namenode.invalidate.work.pct.per.iteration";

View File

@ -1050,21 +1050,6 @@ public class BlockManager {
node.resetBlocks();
invalidateBlocks.remove(node);
// If the DN hasn't block-reported since the most recent
// failover, then we may have been holding up on processing
// over-replicated blocks because of it. But we can now
// process those blocks.
boolean stale = false;
for(DatanodeStorageInfo storage : node.getStorageInfos()) {
if (storage.areBlockContentsStale()) {
stale = true;
break;
}
}
if (stale) {
rescanPostponedMisreplicatedBlocks();
}
}
/** Remove the blocks associated to the given DatanodeStorageInfo. */
@ -1818,17 +1803,7 @@ public class BlockManager {
invalidatedBlocks = processReport(storageInfo, newReport);
}
// Now that we have an up-to-date block report, we know that any
// deletions from a previous NN iteration have been accounted for.
boolean staleBefore = storageInfo.areBlockContentsStale();
storageInfo.receivedBlockReport();
if (staleBefore && !storageInfo.areBlockContentsStale()) {
LOG.info("BLOCK* processReport: Received first block report from "
+ storage + " after starting up or becoming active. Its block "
+ "contents are no longer considered stale");
rescanPostponedMisreplicatedBlocks();
}
} finally {
endTime = Time.now();
namesystem.writeUnlock();
@ -1857,31 +1832,74 @@ public class BlockManager {
/**
* Rescan the list of blocks which were previously postponed.
*/
private void rescanPostponedMisreplicatedBlocks() {
for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
it.hasNext();) {
Block b = it.next();
BlockInfo bi = blocksMap.getStoredBlock(b);
if (bi == null) {
void rescanPostponedMisreplicatedBlocks() {
if (getPostponedMisreplicatedBlocksCount() == 0) {
return;
}
long startTimeRescanPostponedMisReplicatedBlocks = Time.now();
long startPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
namesystem.writeLock();
try {
// blocksPerRescan is the configured number of blocks per rescan.
// Randomly select blocksPerRescan consecutive blocks from the HashSet
// when the number of blocks remaining is larger than blocksPerRescan.
// The reason we don't always pick the first blocksPerRescan blocks is to
// handle the case if for some reason some datanodes remain in
// content stale state for a long time and only impact the first
// blocksPerRescan blocks.
int i = 0;
long startIndex = 0;
long blocksPerRescan =
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
if (base > 0) {
startIndex = DFSUtil.getRandom().nextLong() % (base+1);
if (startIndex < 0) {
startIndex += (base+1);
}
}
Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
for (int tmp = 0; tmp < startIndex; tmp++) {
it.next();
}
for (;it.hasNext(); i++) {
Block b = it.next();
if (i >= blocksPerRescan) {
break;
}
BlockInfo bi = blocksMap.getStoredBlock(b);
if (bi == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Postponed mis-replicated block " + b + " no longer found " +
"in block map.");
}
it.remove();
postponedMisreplicatedBlocksCount.decrementAndGet();
continue;
}
MisReplicationResult res = processMisReplicatedBlock(bi);
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Postponed mis-replicated block " + b + " no longer found " +
"in block map.");
"Re-scanned block " + b + ", result is " + res);
}
if (res != MisReplicationResult.POSTPONE) {
it.remove();
postponedMisreplicatedBlocksCount.decrementAndGet();
}
it.remove();
postponedMisreplicatedBlocksCount.decrementAndGet();
continue;
}
MisReplicationResult res = processMisReplicatedBlock(bi);
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Re-scanned block " + b + ", result is " + res);
}
if (res != MisReplicationResult.POSTPONE) {
it.remove();
postponedMisreplicatedBlocksCount.decrementAndGet();
}
} finally {
namesystem.writeUnlock();
long endPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
(Time.now() - startTimeRescanPostponedMisReplicatedBlocks) +
" msecs. " + endPostponedMisReplicatedBlocksCount +
" blocks are left. " + (startPostponedMisReplicatedBlocksCount -
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
}
}
@ -3580,6 +3598,7 @@ public class BlockManager {
if (namesystem.isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
rescanPostponedMisreplicatedBlocks();
}
Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) {
@ -3648,6 +3667,8 @@ public class BlockManager {
excessReplicateMap.clear();
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
};

View File

@ -133,13 +133,18 @@ public class DatanodeManager {
* writing to stale datanodes, i.e., continue using stale nodes for writing.
*/
private final float ratioUseStaleDataNodesForWrite;
/** The number of stale DataNodes */
private volatile int numStaleNodes;
/** The number of stale storages */
private volatile int numStaleStorages;
/**
* Number of blocks to check for each postponedMisreplicatedBlocks iteration
*/
private final long blocksPerPostponedMisreplicatedBlocksRescan;
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
@ -259,6 +264,9 @@ public class DatanodeManager {
this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@ -1133,6 +1141,10 @@ public class DatanodeManager {
* ratioUseStaleDataNodesForWrite);
}
public long getBlocksPerPostponedMisreplicatedBlocksRescan() {
return blocksPerPostponedMisreplicatedBlocksRescan;
}
/**
* @return The time interval used to mark DataNodes as stale.
*/

View File

@ -2253,4 +2253,12 @@
</description>
</property>
<property>
<name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
<value>10000</value>
<description>Number of blocks to rescan for each iteration of
postponedMisreplicatedBlocks.
</description>
</property>
</configuration>

View File

@ -238,6 +238,14 @@ public class BlockManagerTestUtil {
return dn.updateStorage(s);
}
/**
* Call heartbeat check function of HeartbeatManager
* @param bm the BlockManager to manipulate
*/
public static void rescanPostponedMisreplicatedBlocks(BlockManager bm) {
bm.rescanPostponedMisreplicatedBlocks();
}
public static DatanodeDescriptor getLocalDatanodeDescriptor(
boolean initializeStorage) {
DatanodeDescriptor dn = new DatanodeDescriptor(DFSTestUtil.getLocalDatanodeID());

View File

@ -165,7 +165,12 @@ public class TestDNFencing {
banner("Metadata after nodes have all block-reported");
doMetasave(nn2);
// Force a rescan of postponedMisreplicatedBlocks.
BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
BlockManagerTestUtil.checkHeartbeat(nn2BM);
BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
// The blocks should no longer be postponed.
assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
@ -251,7 +256,12 @@ public class TestDNFencing {
banner("Metadata after nodes have all block-reported");
doMetasave(nn2);
// Force a rescan of postponedMisreplicatedBlocks.
BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
BlockManagerTestUtil.checkHeartbeat(nn2BM);
BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
// The block should no longer be postponed.
assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
@ -347,6 +357,11 @@ public class TestDNFencing {
banner("Metadata after nodes have all block-reported");
doMetasave(nn2);
// Force a rescan of postponedMisreplicatedBlocks.
BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
BlockManagerTestUtil.checkHeartbeat(nn2BM);
BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
// The block should no longer be postponed.
assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());

View File

@ -109,6 +109,10 @@ public class TestDNFencingWithReplication {
HAStressTestHarness harness = new HAStressTestHarness();
harness.conf.setInt(
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
harness.conf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
harness.conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
final MiniDFSCluster cluster = harness.startCluster();
try {