HDFS-2938. Merging change 1244752 from trunk to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1244754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-02-15 22:02:16 +00:00
parent 97a2b5a1bf
commit 7c6f6b0d2b
3 changed files with 21 additions and 14 deletions

View File

@ -41,6 +41,9 @@ Release 0.23.2 - UNRELEASED
HDFS-2525. Race between BlockPoolSliceScanner and append. (Brandon Li
via jitendra)
HDFS-2938. Recursive delete of a large directory make namenode
unresponsive. (Hari Mankude via suresh)
Release 0.23.1 - 2012-02-08
INCOMPATIBLE CHANGES

View File

@ -1928,15 +1928,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} finally {
writeUnlock();
}
getEditLog().logSync();
writeLock();
try {
removeBlocks(collectedBlocks); // Incremental deletion of blocks
} finally {
writeUnlock();
}
getEditLog().logSync();
removeBlocks(collectedBlocks); // Incremental deletion of blocks
collectedBlocks.clear();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
@ -1945,16 +1938,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return true;
}
/** From the given list, incrementally remove the blocks from blockManager */
/**
* From the given list, incrementally remove the blocks from blockManager
* Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
* ensure that other waiters on the lock can get in. See HDFS-2938
*/
private void removeBlocks(List<Block> blocks) {
assert hasWriteLock();
int start = 0;
int end = 0;
while (start < blocks.size()) {
end = BLOCK_DELETION_INCREMENT + start;
end = end > blocks.size() ? blocks.size() : end;
for (int i=start; i<end; i++) {
blockManager.removeBlock(blocks.get(i));
writeLock();
try {
for (int i = start; i < end; i++) {
blockManager.removeBlock(blocks.get(i));
}
} finally {
writeUnlock();
}
start = end;
}

View File

@ -117,8 +117,11 @@ public class TestLargeDirectoryDelete {
try {
int blockcount = getBlockCount();
if (blockcount < TOTAL_BLOCKS && blockcount > 0) {
synchronized(mc.getNamesystem()) {
mc.getNamesystem().writeLock();
try {
lockOps++;
} finally {
mc.getNamesystem().writeUnlock();
}
Thread.sleep(1);
}