HDFS-6362. InvalidateBlocks is inconsistent in usage of DatanodeUuid and StorageID. (Arpit Agarwal)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1959afe113
commit
9a0fcae5bc
|
@ -557,6 +557,9 @@ Release 2.4.1 - UNRELEASED
|
|||
HDFS-6361. TestIdUserGroup.testUserUpdateSetting failed due to out of range
|
||||
nfsnobody Id. (Yongjun Zhang via brandonli)
|
||||
|
||||
HDFS-6362. InvalidateBlocks is inconsistent in usage of DatanodeUuid and
|
||||
StorageID. (Arpit Agarwal)
|
||||
|
||||
Release 2.4.0 - 2014-04-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -265,7 +265,8 @@ public class BlockManager {
|
|||
final long pendingPeriod = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT);
|
||||
invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod);
|
||||
invalidateBlocks = new InvalidateBlocks(
|
||||
datanodeManager.blockInvalidateLimit, pendingPeriod);
|
||||
|
||||
// Compute the map capacity by allocating 2% of total memory
|
||||
blocksMap = new BlocksMap(
|
||||
|
@ -701,7 +702,7 @@ public class BlockManager {
|
|||
|
||||
// remove this block from the list of pending blocks to be deleted.
|
||||
for (DatanodeStorageInfo storage : targets) {
|
||||
invalidateBlocks.remove(storage.getStorageID(), oldBlock);
|
||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
|
||||
}
|
||||
|
||||
// Adjust safe-mode totals, since under-construction blocks don't
|
||||
|
@ -726,7 +727,7 @@ public class BlockManager {
|
|||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
||||
final String storageID = storage.getStorageID();
|
||||
// filter invalidate replicas
|
||||
if(!invalidateBlocks.contains(storageID, block)) {
|
||||
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
|
||||
locations.add(storage);
|
||||
}
|
||||
}
|
||||
|
@ -1016,7 +1017,7 @@ public class BlockManager {
|
|||
pendingDNMessages.removeAllMessagesForDatanode(node);
|
||||
|
||||
node.resetBlocks();
|
||||
invalidateBlocks.remove(node.getDatanodeUuid());
|
||||
invalidateBlocks.remove(node);
|
||||
|
||||
// If the DN hasn't block-reported since the most recent
|
||||
// failover, then we may have been holding up on processing
|
||||
|
@ -1184,7 +1185,7 @@ public class BlockManager {
|
|||
* @return total number of block for deletion
|
||||
*/
|
||||
int computeInvalidateWork(int nodesToProcess) {
|
||||
final List<String> nodes = invalidateBlocks.getStorageIDs();
|
||||
final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
|
||||
Collections.shuffle(nodes);
|
||||
|
||||
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
|
||||
|
@ -1973,7 +1974,7 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
// Ignore replicas already scheduled to be removed from the DN
|
||||
if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
|
||||
if(invalidateBlocks.contains(dn, block)) {
|
||||
/*
|
||||
* TODO: following assertion is incorrect, see HDFS-2668 assert
|
||||
* storedBlock.findDatanode(dn) < 0 : "Block " + block +
|
||||
|
@ -3199,9 +3200,8 @@ public class BlockManager {
|
|||
*
|
||||
* @return number of blocks scheduled for removal during this iteration.
|
||||
*/
|
||||
private int invalidateWorkForOneNode(String nodeId) {
|
||||
private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
||||
final List<Block> toInvalidate;
|
||||
final DatanodeDescriptor dn;
|
||||
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
|
@ -3210,15 +3210,13 @@ public class BlockManager {
|
|||
LOG.debug("In safemode, not computing replication work");
|
||||
return 0;
|
||||
}
|
||||
// get blocks to invalidate for the nodeId
|
||||
assert nodeId != null;
|
||||
dn = datanodeManager.getDatanode(nodeId);
|
||||
if (dn == null) {
|
||||
invalidateBlocks.remove(nodeId);
|
||||
return 0;
|
||||
}
|
||||
toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
|
||||
if (toInvalidate == null) {
|
||||
try {
|
||||
toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
|
||||
|
||||
if (toInvalidate == null) {
|
||||
return 0;
|
||||
}
|
||||
} catch(UnregisteredNodeException une) {
|
||||
return 0;
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -44,13 +44,13 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
class InvalidateBlocks {
|
||||
/** Mapping: StorageID -> Collection of Blocks */
|
||||
private final Map<String, LightWeightHashSet<Block>> node2blocks =
|
||||
new TreeMap<String, LightWeightHashSet<Block>>();
|
||||
/** Mapping: DatanodeInfo -> Collection of Blocks */
|
||||
private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
|
||||
new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>();
|
||||
/** The total number of blocks in the map. */
|
||||
private long numBlocks = 0L;
|
||||
|
||||
private final DatanodeManager datanodeManager;
|
||||
private final int blockInvalidateLimit;
|
||||
|
||||
/**
|
||||
* The period of pending time for block invalidation since the NameNode
|
||||
|
@ -60,8 +60,8 @@ class InvalidateBlocks {
|
|||
/** the startup time */
|
||||
private final long startupTime = Time.monotonicNow();
|
||||
|
||||
InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) {
|
||||
this.datanodeManager = datanodeManager;
|
||||
InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) {
|
||||
this.blockInvalidateLimit = blockInvalidateLimit;
|
||||
this.pendingPeriodInMs = pendingPeriodInMs;
|
||||
printBlockDeletionTime(BlockManager.LOG);
|
||||
}
|
||||
|
@ -86,12 +86,9 @@ class InvalidateBlocks {
|
|||
* invalidation. Blocks are compared including their generation stamps:
|
||||
* if a block is pending invalidation but with a different generation stamp,
|
||||
* returns false.
|
||||
* @param storageID the storage to check
|
||||
* @param the block to look for
|
||||
*
|
||||
*/
|
||||
synchronized boolean contains(final String storageID, final Block block) {
|
||||
final LightWeightHashSet<Block> s = node2blocks.get(storageID);
|
||||
synchronized boolean contains(final DatanodeInfo dn, final Block block) {
|
||||
final LightWeightHashSet<Block> s = node2blocks.get(dn);
|
||||
if (s == null) {
|
||||
return false; // no invalidate blocks for this storage ID
|
||||
}
|
||||
|
@ -106,10 +103,10 @@ class InvalidateBlocks {
|
|||
*/
|
||||
synchronized void add(final Block block, final DatanodeInfo datanode,
|
||||
final boolean log) {
|
||||
LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
|
||||
LightWeightHashSet<Block> set = node2blocks.get(datanode);
|
||||
if (set == null) {
|
||||
set = new LightWeightHashSet<Block>();
|
||||
node2blocks.put(datanode.getDatanodeUuid(), set);
|
||||
node2blocks.put(datanode, set);
|
||||
}
|
||||
if (set.add(block)) {
|
||||
numBlocks++;
|
||||
|
@ -121,20 +118,20 @@ class InvalidateBlocks {
|
|||
}
|
||||
|
||||
/** Remove a storage from the invalidatesSet */
|
||||
synchronized void remove(final String storageID) {
|
||||
final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
|
||||
synchronized void remove(final DatanodeInfo dn) {
|
||||
final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
|
||||
if (blocks != null) {
|
||||
numBlocks -= blocks.size();
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove the block from the specified storage. */
|
||||
synchronized void remove(final String storageID, final Block block) {
|
||||
final LightWeightHashSet<Block> v = node2blocks.get(storageID);
|
||||
synchronized void remove(final DatanodeInfo dn, final Block block) {
|
||||
final LightWeightHashSet<Block> v = node2blocks.get(dn);
|
||||
if (v != null && v.remove(block)) {
|
||||
numBlocks--;
|
||||
if (v.isEmpty()) {
|
||||
node2blocks.remove(storageID);
|
||||
node2blocks.remove(dn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -148,18 +145,18 @@ class InvalidateBlocks {
|
|||
return;
|
||||
}
|
||||
|
||||
for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
|
||||
for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
|
||||
final LightWeightHashSet<Block> blocks = entry.getValue();
|
||||
if (blocks.size() > 0) {
|
||||
out.println(datanodeManager.getDatanode(entry.getKey()));
|
||||
out.println(entry.getKey());
|
||||
out.println(blocks);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @return a list of the storage IDs. */
|
||||
synchronized List<String> getStorageIDs() {
|
||||
return new ArrayList<String>(node2blocks.keySet());
|
||||
synchronized List<DatanodeInfo> getDatanodes() {
|
||||
return new ArrayList<DatanodeInfo>(node2blocks.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -170,8 +167,7 @@ class InvalidateBlocks {
|
|||
return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
|
||||
}
|
||||
|
||||
synchronized List<Block> invalidateWork(
|
||||
final String storageId, final DatanodeDescriptor dn) {
|
||||
synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
|
||||
final long delay = getInvalidationDelay();
|
||||
if (delay > 0) {
|
||||
if (BlockManager.LOG.isDebugEnabled()) {
|
||||
|
@ -181,18 +177,18 @@ class InvalidateBlocks {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
final LightWeightHashSet<Block> set = node2blocks.get(storageId);
|
||||
final LightWeightHashSet<Block> set = node2blocks.get(dn);
|
||||
if (set == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// # blocks that can be sent in one message is limited
|
||||
final int limit = datanodeManager.blockInvalidateLimit;
|
||||
final int limit = blockInvalidateLimit;
|
||||
final List<Block> toInvalidate = set.pollN(limit);
|
||||
|
||||
// If we send everything in this message, remove this node entry
|
||||
if (set.isEmpty()) {
|
||||
remove(storageId);
|
||||
remove(dn);
|
||||
}
|
||||
|
||||
dn.addBlocksToBeInvalidated(toInvalidate);
|
||||
|
|
Loading…
Reference in New Issue