HDFS-2938. Recursive delete of a large directory make namenode unresponsive. Contributed by Hari Mankude.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1244752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-02-15 22:00:08 +00:00
parent f82e862e1f
commit ff91453227
3 changed files with 21 additions and 14 deletions

View File

@ -251,6 +251,9 @@ Release 0.23.2 - UNRELEASED
HDFS-2950. Secondary NN HTTPS address should be listed as a
NAMESERVICE_SPECIFIC_KEY. (todd)
HDFS-2938. Recursive delete of a large directory make namenode
unresponsive. (Hari Mankude via suresh)
Release 0.23.1 - 2012-02-08
INCOMPATIBLE CHANGES

View File

@ -1980,15 +1980,8 @@ private boolean deleteInternal(String src, boolean recursive,
} finally {
writeUnlock();
}
getEditLog().logSync();
writeLock();
try {
removeBlocks(collectedBlocks); // Incremental deletion of blocks
} finally {
writeUnlock();
}
getEditLog().logSync();
removeBlocks(collectedBlocks); // Incremental deletion of blocks
collectedBlocks.clear();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
@ -1997,16 +1990,24 @@ private boolean deleteInternal(String src, boolean recursive,
return true;
}
/** From the given list, incrementally remove the blocks from blockManager */
/**
* From the given list, incrementally remove the blocks from blockManager
* Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
* ensure that other waiters on the lock can get in. See HDFS-2938
*/
private void removeBlocks(List<Block> blocks) {
assert hasWriteLock();
int start = 0;
int end = 0;
while (start < blocks.size()) {
end = BLOCK_DELETION_INCREMENT + start;
end = end > blocks.size() ? blocks.size() : end;
for (int i=start; i<end; i++) {
blockManager.removeBlock(blocks.get(i));
writeLock();
try {
for (int i = start; i < end; i++) {
blockManager.removeBlock(blocks.get(i));
}
} finally {
writeUnlock();
}
start = end;
}

View File

@ -117,8 +117,11 @@ protected void execute() throws Throwable {
try {
int blockcount = getBlockCount();
if (blockcount < TOTAL_BLOCKS && blockcount > 0) {
synchronized(mc.getNamesystem()) {
mc.getNamesystem().writeLock();
try {
lockOps++;
} finally {
mc.getNamesystem().writeUnlock();
}
Thread.sleep(1);
}