From b8c97091cc17597744e8b3d128d2da34a5667f28 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Thu, 15 May 2014 22:41:01 +0000 Subject: [PATCH] HDFS-6362. Merge r1595056 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1595066 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../server/blockmanagement/BlockManager.java | 32 ++++++------ .../blockmanagement/InvalidateBlocks.java | 50 +++++++++---------- 3 files changed, 41 insertions(+), 44 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1c1cd46c949..649e376cde6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -308,6 +308,9 @@ Release 2.4.1 - UNRELEASED HDFS-4052. BlockManager#invalidateWork should print log outside the lock. (Jing Zhao via suresh) + HDFS-6362. InvalidateBlocks is inconsistent in usage of DatanodeUuid and + StorageID. (Arpit Agarwal) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index cc8a4642cf9..543dea0cefe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -265,7 +265,8 @@ public class BlockManager { final long pendingPeriod = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT); - invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod); + invalidateBlocks = new InvalidateBlocks( + datanodeManager.blockInvalidateLimit, pendingPeriod); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( @@ -701,7 +702,7 @@ public class BlockManager { // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - invalidateBlocks.remove(storage.getStorageID(), oldBlock); + invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock); } // Adjust safe-mode totals, since under-construction blocks don't @@ -726,7 +727,7 @@ public class BlockManager { for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final String storageID = storage.getStorageID(); // filter invalidate replicas - if(!invalidateBlocks.contains(storageID, block)) { + if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { locations.add(storage); } } @@ -1016,7 +1017,7 @@ public class BlockManager { pendingDNMessages.removeAllMessagesForDatanode(node); node.resetBlocks(); - invalidateBlocks.remove(node.getDatanodeUuid()); + invalidateBlocks.remove(node); // If the DN hasn't block-reported since the most recent // failover, then we may have been holding up on processing @@ -1184,7 +1185,7 @@ public class BlockManager { * @return total number of block for deletion */ int computeInvalidateWork(int nodesToProcess) { - final List nodes = invalidateBlocks.getStorageIDs(); + final List nodes = invalidateBlocks.getDatanodes(); Collections.shuffle(nodes); nodesToProcess = Math.min(nodes.size(), nodesToProcess); @@ -1976,7 +1977,7 @@ public class BlockManager { } // Ignore replicas already scheduled to be removed from the DN - if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) { + if(invalidateBlocks.contains(dn, block)) { /* * TODO: following assertion is incorrect, see HDFS-2668 assert * storedBlock.findDatanode(dn) < 0 : "Block " + block + @@ -3202,9 +3203,8 @@ public class BlockManager { * * @return number of blocks scheduled for removal during this iteration. */ - private int invalidateWorkForOneNode(String nodeId) { + private int invalidateWorkForOneNode(DatanodeInfo dn) { final List toInvalidate; - final DatanodeDescriptor dn; namesystem.writeLock(); try { @@ -3213,15 +3213,13 @@ public class BlockManager { LOG.debug("In safemode, not computing replication work"); return 0; } - // get blocks to invalidate for the nodeId - assert nodeId != null; - dn = datanodeManager.getDatanode(nodeId); - if (dn == null) { - invalidateBlocks.remove(nodeId); - return 0; - } - toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn); - if (toInvalidate == null) { + try { + toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn)); + + if (toInvalidate == null) { + return 0; + } + } catch(UnregisteredNodeException une) { return 0; } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java index 9212dd5f0fc..87fa774f54b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java @@ -44,13 +44,13 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private class InvalidateBlocks { - /** Mapping: StorageID -> Collection of Blocks */ - private final Map> node2blocks = - new TreeMap>(); + /** Mapping: DatanodeInfo -> Collection of Blocks */ + private final Map> node2blocks = + new TreeMap>(); /** The total number of blocks in the map. */ private long numBlocks = 0L; - private final DatanodeManager datanodeManager; + private final int blockInvalidateLimit; /** * The period of pending time for block invalidation since the NameNode @@ -60,8 +60,8 @@ class InvalidateBlocks { /** the startup time */ private final long startupTime = Time.monotonicNow(); - InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) { - this.datanodeManager = datanodeManager; + InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) { + this.blockInvalidateLimit = blockInvalidateLimit; this.pendingPeriodInMs = pendingPeriodInMs; printBlockDeletionTime(BlockManager.LOG); } @@ -86,12 +86,9 @@ class InvalidateBlocks { * invalidation. Blocks are compared including their generation stamps: * if a block is pending invalidation but with a different generation stamp, * returns false. - * @param storageID the storage to check - * @param the block to look for - * */ - synchronized boolean contains(final String storageID, final Block block) { - final LightWeightHashSet s = node2blocks.get(storageID); + synchronized boolean contains(final DatanodeInfo dn, final Block block) { + final LightWeightHashSet s = node2blocks.get(dn); if (s == null) { return false; // no invalidate blocks for this storage ID } @@ -106,10 +103,10 @@ class InvalidateBlocks { */ synchronized void add(final Block block, final DatanodeInfo datanode, final boolean log) { - LightWeightHashSet set = node2blocks.get(datanode.getDatanodeUuid()); + LightWeightHashSet set = node2blocks.get(datanode); if (set == null) { set = new LightWeightHashSet(); - node2blocks.put(datanode.getDatanodeUuid(), set); + node2blocks.put(datanode, set); } if (set.add(block)) { numBlocks++; @@ -121,20 +118,20 @@ class InvalidateBlocks { } /** Remove a storage from the invalidatesSet */ - synchronized void remove(final String storageID) { - final LightWeightHashSet blocks = node2blocks.remove(storageID); + synchronized void remove(final DatanodeInfo dn) { + final LightWeightHashSet blocks = node2blocks.remove(dn); if (blocks != null) { numBlocks -= blocks.size(); } } /** Remove the block from the specified storage. */ - synchronized void remove(final String storageID, final Block block) { - final LightWeightHashSet v = node2blocks.get(storageID); + synchronized void remove(final DatanodeInfo dn, final Block block) { + final LightWeightHashSet v = node2blocks.get(dn); if (v != null && v.remove(block)) { numBlocks--; if (v.isEmpty()) { - node2blocks.remove(storageID); + node2blocks.remove(dn); } } } @@ -148,18 +145,18 @@ class InvalidateBlocks { return; } - for(Map.Entry> entry : node2blocks.entrySet()) { + for(Map.Entry> entry : node2blocks.entrySet()) { final LightWeightHashSet blocks = entry.getValue(); if (blocks.size() > 0) { - out.println(datanodeManager.getDatanode(entry.getKey())); + out.println(entry.getKey()); out.println(blocks); } } } /** @return a list of the storage IDs. */ - synchronized List getStorageIDs() { - return new ArrayList(node2blocks.keySet()); + synchronized List getDatanodes() { + return new ArrayList(node2blocks.keySet()); } /** @@ -170,8 +167,7 @@ class InvalidateBlocks { return pendingPeriodInMs - (Time.monotonicNow() - startupTime); } - synchronized List invalidateWork( - final String storageId, final DatanodeDescriptor dn) { + synchronized List invalidateWork(final DatanodeDescriptor dn) { final long delay = getInvalidationDelay(); if (delay > 0) { if (BlockManager.LOG.isDebugEnabled()) { @@ -181,18 +177,18 @@ class InvalidateBlocks { } return null; } - final LightWeightHashSet set = node2blocks.get(storageId); + final LightWeightHashSet set = node2blocks.get(dn); if (set == null) { return null; } // # blocks that can be sent in one message is limited - final int limit = datanodeManager.blockInvalidateLimit; + final int limit = blockInvalidateLimit; final List toInvalidate = set.pollN(limit); // If we send everything in this message, remove this node entry if (set.isEmpty()) { - remove(storageId); + remove(dn); } dn.addBlocksToBeInvalidated(toInvalidate);