DFS-1257. Fix a race condition on BlockManager.recentInvalidateSets. Contributed by Eric Payne

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1158933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-08-17 22:00:27 +00:00
parent b094465168
commit cc875f0124
2 changed files with 83 additions and 58 deletions

View File

@ -972,6 +972,9 @@ Trunk (unreleased changes)
HDFS-73. DFSOutputStream does not close all the sockets.
(Uma Maheswara Rao G via eli)
HDFS-1257. Fix a race condition on BlockManager.recentInvalidateSets.
(Eric Payne via szetszwo)
BREAKDOWN OF HDFS-1073 SUBTASKS
HDFS-1521. Persist transaction ID on disk between NN restarts.

View File

@ -758,18 +758,23 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
}
private void removeFromInvalidates(String storageID, Block block) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
pendingDeletionBlocksCount--;
if (v.isEmpty()) {
recentInvalidateSets.remove(storageID);
synchronized(recentInvalidateSets) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
pendingDeletionBlocksCount--;
if (v.isEmpty()) {
recentInvalidateSets.remove(storageID);
}
}
}
}
boolean belongsToInvalidates(String storageID, Block block) {
Collection<Block> invalidateSet = recentInvalidateSets.get(storageID);
return invalidateSet != null && invalidateSet.contains(block);
Collection<Block> invalidateSet;
synchronized(recentInvalidateSets) {
invalidateSet = recentInvalidateSets.get(storageID);
return invalidateSet != null && invalidateSet.contains(block);
}
}
/**
@ -781,17 +786,19 @@ boolean belongsToInvalidates(String storageID, Block block) {
* @param log true to create an entry in the log
*/
private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
Collection<Block> invalidateSet = recentInvalidateSets
.get(dn.getStorageID());
if (invalidateSet == null) {
invalidateSet = new HashSet<Block>();
recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
}
if (invalidateSet.add(b)) {
pendingDeletionBlocksCount++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + dn.getName());
synchronized(recentInvalidateSets) {
Collection<Block> invalidateSet = recentInvalidateSets
.get(dn.getStorageID());
if (invalidateSet == null) {
invalidateSet = new HashSet<Block>();
recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
}
if (invalidateSet.add(b)) {
pendingDeletionBlocksCount++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + dn.getName());
}
}
}
}
@ -830,16 +837,21 @@ private void addToInvalidates(Block b) {
*/
private void dumpRecentInvalidateSets(PrintWriter out) {
assert namesystem.hasWriteLock();
int size = recentInvalidateSets.values().size();
int size;
synchronized(recentInvalidateSets) {
size = recentInvalidateSets.values().size();
}
out.println("Metasave: Blocks " + pendingDeletionBlocksCount
+ " waiting deletion from " + size + " datanodes.");
if (size == 0) {
return;
}
for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
synchronized(recentInvalidateSets) {
for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
}
}
}
}
@ -950,13 +962,16 @@ public int getUnderReplicatedNotMissingBlocks() {
* @return total number of block for deletion
*/
int computeInvalidateWork(int nodesToProcess) {
int numOfNodes = recentInvalidateSets.size();
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
int numOfNodes;
ArrayList<String> keyArray;
// TODO should using recentInvalidateSets be synchronized?
// get an array of the keys
ArrayList<String> keyArray =
new ArrayList<String>(recentInvalidateSets.keySet());
synchronized(recentInvalidateSets) {
numOfNodes = recentInvalidateSets.size();
// get an array of the keys
keyArray = new ArrayList<String>(recentInvalidateSets.keySet());
}
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
// randomly pick up <i>nodesToProcess</i> nodes
// and put them at [0, nodesToProcess)
@ -2428,7 +2443,10 @@ private int getReplication(Block block) {
/** Remove a datanode from the invalidatesSet */
private void removeFromInvalidates(String storageID) {
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
Collection<Block> blocks;
synchronized(recentInvalidateSets) {
blocks = recentInvalidateSets.remove(storageID);
}
if (blocks != null) {
pendingDeletionBlocksCount -= blocks.size();
}
@ -2454,39 +2472,43 @@ private int invalidateWorkForOneNode(String nodeId) {
return 0;
}
Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
if (invalidateSet == null)
return 0;
Collection<Block> invalidateSet;
ArrayList<Block> blocksToInvalidate;
synchronized(recentInvalidateSets) {
invalidateSet = recentInvalidateSets.get(nodeId);
if (invalidateSet == null)
return 0;
ArrayList<Block> blocksToInvalidate = new ArrayList<Block>(
blocksToInvalidate = new ArrayList<Block>(
getDatanodeManager().blockInvalidateLimit);
// # blocks that can be sent in one message is limited
Iterator<Block> it = invalidateSet.iterator();
for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
&& it.hasNext(); blkCount++) {
blocksToInvalidate.add(it.next());
it.remove();
}
// If we send everything in this message, remove this node entry
if (!it.hasNext()) {
removeFromInvalidates(nodeId);
}
dn.addBlocksToBeInvalidated(blocksToInvalidate);
if (NameNode.stateChangeLog.isInfoEnabled()) {
StringBuilder blockList = new StringBuilder();
for (Block blk : blocksToInvalidate) {
blockList.append(' ');
blockList.append(blk);
// # blocks that can be sent in one message is limited
Iterator<Block> it = invalidateSet.iterator();
for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
&& it.hasNext(); blkCount++) {
blocksToInvalidate.add(it.next());
it.remove();
}
NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
+ " to delete " + blockList);
// If we send everything in this message, remove this node entry
if (!it.hasNext()) {
removeFromInvalidates(nodeId);
}
dn.addBlocksToBeInvalidated(blocksToInvalidate);
if (NameNode.stateChangeLog.isInfoEnabled()) {
StringBuilder blockList = new StringBuilder();
for (Block blk : blocksToInvalidate) {
blockList.append(' ');
blockList.append(blk);
}
NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
+ " to delete " + blockList);
}
pendingDeletionBlocksCount -= blocksToInvalidate.size();
return blocksToInvalidate.size();
}
pendingDeletionBlocksCount -= blocksToInvalidate.size();
return blocksToInvalidate.size();
} finally {
namesystem.writeUnlock();
}