HDFS-6362. Merge r1595056 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1595066 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-05-15 22:41:01 +00:00
parent 8970cf4d86
commit b8c97091cc
3 changed files with 41 additions and 44 deletions

View File

@ -308,6 +308,9 @@ Release 2.4.1 - UNRELEASED
HDFS-4052. BlockManager#invalidateWork should print log outside the lock. HDFS-4052. BlockManager#invalidateWork should print log outside the lock.
(Jing Zhao via suresh) (Jing Zhao via suresh)
HDFS-6362. InvalidateBlocks is inconsistent in usage of DatanodeUuid and
StorageID. (Arpit Agarwal)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -265,7 +265,8 @@ public class BlockManager {
final long pendingPeriod = conf.getLong( final long pendingPeriod = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT);
invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod); invalidateBlocks = new InvalidateBlocks(
datanodeManager.blockInvalidateLimit, pendingPeriod);
// Compute the map capacity by allocating 2% of total memory // Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap( blocksMap = new BlocksMap(
@ -701,7 +702,7 @@ public class BlockManager {
// remove this block from the list of pending blocks to be deleted. // remove this block from the list of pending blocks to be deleted.
for (DatanodeStorageInfo storage : targets) { for (DatanodeStorageInfo storage : targets) {
invalidateBlocks.remove(storage.getStorageID(), oldBlock); invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
} }
// Adjust safe-mode totals, since under-construction blocks don't // Adjust safe-mode totals, since under-construction blocks don't
@ -726,7 +727,7 @@ public class BlockManager {
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final String storageID = storage.getStorageID(); final String storageID = storage.getStorageID();
// filter invalidate replicas // filter invalidate replicas
if(!invalidateBlocks.contains(storageID, block)) { if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
locations.add(storage); locations.add(storage);
} }
} }
@ -1016,7 +1017,7 @@ public class BlockManager {
pendingDNMessages.removeAllMessagesForDatanode(node); pendingDNMessages.removeAllMessagesForDatanode(node);
node.resetBlocks(); node.resetBlocks();
invalidateBlocks.remove(node.getDatanodeUuid()); invalidateBlocks.remove(node);
// If the DN hasn't block-reported since the most recent // If the DN hasn't block-reported since the most recent
// failover, then we may have been holding up on processing // failover, then we may have been holding up on processing
@ -1184,7 +1185,7 @@ public class BlockManager {
* @return total number of block for deletion * @return total number of block for deletion
*/ */
int computeInvalidateWork(int nodesToProcess) { int computeInvalidateWork(int nodesToProcess) {
final List<String> nodes = invalidateBlocks.getStorageIDs(); final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
Collections.shuffle(nodes); Collections.shuffle(nodes);
nodesToProcess = Math.min(nodes.size(), nodesToProcess); nodesToProcess = Math.min(nodes.size(), nodesToProcess);
@ -1976,7 +1977,7 @@ public class BlockManager {
} }
// Ignore replicas already scheduled to be removed from the DN // Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) { if(invalidateBlocks.contains(dn, block)) {
/* /*
* TODO: following assertion is incorrect, see HDFS-2668 assert * TODO: following assertion is incorrect, see HDFS-2668 assert
* storedBlock.findDatanode(dn) < 0 : "Block " + block + * storedBlock.findDatanode(dn) < 0 : "Block " + block +
@ -3202,9 +3203,8 @@ public class BlockManager {
* *
* @return number of blocks scheduled for removal during this iteration. * @return number of blocks scheduled for removal during this iteration.
*/ */
private int invalidateWorkForOneNode(String nodeId) { private int invalidateWorkForOneNode(DatanodeInfo dn) {
final List<Block> toInvalidate; final List<Block> toInvalidate;
final DatanodeDescriptor dn;
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -3213,15 +3213,13 @@ public class BlockManager {
LOG.debug("In safemode, not computing replication work"); LOG.debug("In safemode, not computing replication work");
return 0; return 0;
} }
// get blocks to invalidate for the nodeId try {
assert nodeId != null; toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
dn = datanodeManager.getDatanode(nodeId);
if (dn == null) { if (toInvalidate == null) {
invalidateBlocks.remove(nodeId);
return 0; return 0;
} }
toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn); } catch(UnregisteredNodeException une) {
if (toInvalidate == null) {
return 0; return 0;
} }
} finally { } finally {

View File

@ -44,13 +44,13 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class InvalidateBlocks { class InvalidateBlocks {
/** Mapping: StorageID -> Collection of Blocks */ /** Mapping: DatanodeInfo -> Collection of Blocks */
private final Map<String, LightWeightHashSet<Block>> node2blocks = private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
new TreeMap<String, LightWeightHashSet<Block>>(); new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>();
/** The total number of blocks in the map. */ /** The total number of blocks in the map. */
private long numBlocks = 0L; private long numBlocks = 0L;
private final DatanodeManager datanodeManager; private final int blockInvalidateLimit;
/** /**
* The period of pending time for block invalidation since the NameNode * The period of pending time for block invalidation since the NameNode
@ -60,8 +60,8 @@ class InvalidateBlocks {
/** the startup time */ /** the startup time */
private final long startupTime = Time.monotonicNow(); private final long startupTime = Time.monotonicNow();
InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) { InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) {
this.datanodeManager = datanodeManager; this.blockInvalidateLimit = blockInvalidateLimit;
this.pendingPeriodInMs = pendingPeriodInMs; this.pendingPeriodInMs = pendingPeriodInMs;
printBlockDeletionTime(BlockManager.LOG); printBlockDeletionTime(BlockManager.LOG);
} }
@ -86,12 +86,9 @@ class InvalidateBlocks {
* invalidation. Blocks are compared including their generation stamps: * invalidation. Blocks are compared including their generation stamps:
* if a block is pending invalidation but with a different generation stamp, * if a block is pending invalidation but with a different generation stamp,
* returns false. * returns false.
* @param storageID the storage to check
* @param the block to look for
*
*/ */
synchronized boolean contains(final String storageID, final Block block) { synchronized boolean contains(final DatanodeInfo dn, final Block block) {
final LightWeightHashSet<Block> s = node2blocks.get(storageID); final LightWeightHashSet<Block> s = node2blocks.get(dn);
if (s == null) { if (s == null) {
return false; // no invalidate blocks for this storage ID return false; // no invalidate blocks for this storage ID
} }
@ -106,10 +103,10 @@ class InvalidateBlocks {
*/ */
synchronized void add(final Block block, final DatanodeInfo datanode, synchronized void add(final Block block, final DatanodeInfo datanode,
final boolean log) { final boolean log) {
LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid()); LightWeightHashSet<Block> set = node2blocks.get(datanode);
if (set == null) { if (set == null) {
set = new LightWeightHashSet<Block>(); set = new LightWeightHashSet<Block>();
node2blocks.put(datanode.getDatanodeUuid(), set); node2blocks.put(datanode, set);
} }
if (set.add(block)) { if (set.add(block)) {
numBlocks++; numBlocks++;
@ -121,20 +118,20 @@ class InvalidateBlocks {
} }
/** Remove a storage from the invalidatesSet */ /** Remove a storage from the invalidatesSet */
synchronized void remove(final String storageID) { synchronized void remove(final DatanodeInfo dn) {
final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID); final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
if (blocks != null) { if (blocks != null) {
numBlocks -= blocks.size(); numBlocks -= blocks.size();
} }
} }
/** Remove the block from the specified storage. */ /** Remove the block from the specified storage. */
synchronized void remove(final String storageID, final Block block) { synchronized void remove(final DatanodeInfo dn, final Block block) {
final LightWeightHashSet<Block> v = node2blocks.get(storageID); final LightWeightHashSet<Block> v = node2blocks.get(dn);
if (v != null && v.remove(block)) { if (v != null && v.remove(block)) {
numBlocks--; numBlocks--;
if (v.isEmpty()) { if (v.isEmpty()) {
node2blocks.remove(storageID); node2blocks.remove(dn);
} }
} }
} }
@ -148,18 +145,18 @@ class InvalidateBlocks {
return; return;
} }
for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) { for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
final LightWeightHashSet<Block> blocks = entry.getValue(); final LightWeightHashSet<Block> blocks = entry.getValue();
if (blocks.size() > 0) { if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey())); out.println(entry.getKey());
out.println(blocks); out.println(blocks);
} }
} }
} }
/** @return a list of the storage IDs. */ /** @return a list of the storage IDs. */
synchronized List<String> getStorageIDs() { synchronized List<DatanodeInfo> getDatanodes() {
return new ArrayList<String>(node2blocks.keySet()); return new ArrayList<DatanodeInfo>(node2blocks.keySet());
} }
/** /**
@ -170,8 +167,7 @@ class InvalidateBlocks {
return pendingPeriodInMs - (Time.monotonicNow() - startupTime); return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
} }
synchronized List<Block> invalidateWork( synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
final String storageId, final DatanodeDescriptor dn) {
final long delay = getInvalidationDelay(); final long delay = getInvalidationDelay();
if (delay > 0) { if (delay > 0) {
if (BlockManager.LOG.isDebugEnabled()) { if (BlockManager.LOG.isDebugEnabled()) {
@ -181,18 +177,18 @@ class InvalidateBlocks {
} }
return null; return null;
} }
final LightWeightHashSet<Block> set = node2blocks.get(storageId); final LightWeightHashSet<Block> set = node2blocks.get(dn);
if (set == null) { if (set == null) {
return null; return null;
} }
// # blocks that can be sent in one message is limited // # blocks that can be sent in one message is limited
final int limit = datanodeManager.blockInvalidateLimit; final int limit = blockInvalidateLimit;
final List<Block> toInvalidate = set.pollN(limit); final List<Block> toInvalidate = set.pollN(limit);
// If we send everything in this message, remove this node entry // If we send everything in this message, remove this node entry
if (set.isEmpty()) { if (set.isEmpty()) {
remove(storageId); remove(dn);
} }
dn.addBlocksToBeInvalidated(toInvalidate); dn.addBlocksToBeInvalidated(toInvalidate);