HDFS-7611. deleteSnapshot and delete of a file can leave orphaned blocks in the blocksMap on NameNode restart. Contributed by Jing Zhao and Byron Wong.

(cherry picked from commit d244574d03)
This commit is contained in:
Jing Zhao 2015-01-28 15:24:28 -08:00
parent 12522fd9cb
commit e8300957a7
6 changed files with 71 additions and 21 deletions

View File

@ -520,6 +520,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7677. DistributedFileSystem#truncate should resolve symlinks. (yliu)
HDFS-7611. deleteSnapshot and delete of a file can leave orphaned blocks
in the blocksMap on NameNode restart. (jing9 and Byron Wong)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -223,20 +223,25 @@ private static long unprotectedDelete(
// set the parent's modification time
final INodeDirectory parent = targetNode.getParent();
parent.updateModificationTime(mtime, latestSnapshot);
fsd.updateCountForDelete(targetNode, iip);
if (removed == 0) {
return 0;
}
// collect block
// collect block and update quota
if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
} else {
Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
latestSnapshot, collectedBlocks, removedINodes, true);
parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-counts.get(Quota.DISKSPACE), true);
removed = counts.get(Quota.NAMESPACE);
// TODO: quota verification may fail the deletion here. We should not
// count the snapshot diff into quota usage in the future.
fsd.updateCount(iip, -counts.get(Quota.NAMESPACE),
-counts.get(Quota.DISKSPACE), true);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+ iip.getPath() + " is removed");

View File

@ -625,9 +625,12 @@ long removeSrc() throws IOException {
NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo:" +
error);
throw new IOException(error);
} else {
// update the quota count if necessary
fsd.updateCountForDelete(srcChild, srcIIP);
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
return removedNum;
}
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
return removedNum;
}
boolean removeSrc4OldRename() throws IOException {
@ -638,6 +641,8 @@ boolean removeSrc4OldRename() throws IOException {
" can not be removed");
return false;
} else {
// update the quota count if necessary
fsd.updateCountForDelete(srcChild, srcIIP);
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
return true;
}
@ -647,6 +652,8 @@ long removeDst() throws IOException {
long removedNum = fsd.removeLastINode(dstIIP);
if (removedNum != -1) {
oldDstChild = dstIIP.getLastINode();
// update the quota count if necessary
fsd.updateCountForDelete(oldDstChild, dstIIP);
dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
}
return removedNum;

View File

@ -601,7 +601,22 @@ void updateSpaceConsumed(INodesInPath iip, long nsDelta, long dsDelta)
writeUnlock();
}
}
/**
* Update the quota usage after deletion. The quota update is only necessary
* when image/edits have been loaded and the file/dir to be deleted is not
* contained in snapshots.
*/
void updateCountForDelete(final INode inode, final INodesInPath iip)
throws QuotaExceededException {
if (getFSNamesystem().isImageLoaded() &&
!inode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
Quota.Counts counts = inode.computeQuotaUsage();
updateCount(iip, -counts.get(Quota.NAMESPACE),
-counts.get(Quota.DISKSPACE), false);
}
}
void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
boolean checkQuota) throws QuotaExceededException {
updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
@ -904,11 +919,12 @@ INodesInPath addLastINodeNoQuotaCheck(INodesInPath existing, INode i) {
/**
* Remove the last inode in the path from the namespace.
* Count of each ancestor with quota is also updated.
* Note: the caller needs to update the ancestors' quota count.
*
* @return -1 for failing to remove;
* 0 for removing a reference whose referred inode has other
* reference nodes;
* >0 otherwise.
* 1 otherwise.
*/
long removeLastINode(final INodesInPath iip) throws QuotaExceededException {
final int latestSnapshot = iip.getLatestSnapshotId();
@ -917,19 +933,9 @@ long removeLastINode(final INodesInPath iip) throws QuotaExceededException {
if (!parent.removeChild(last, latestSnapshot)) {
return -1;
}
if (!last.isInLatestSnapshot(latestSnapshot)) {
final Quota.Counts counts = last.computeQuotaUsage();
updateCountNoQuotaCheck(iip, iip.length() - 1,
-counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
if (INodeReference.tryRemoveReference(last) > 0) {
return 0;
} else {
return counts.get(Quota.NAMESPACE);
}
}
return 1;
return (!last.isInLatestSnapshot(latestSnapshot)
&& INodeReference.tryRemoveReference(last) > 0) ? 0 : 1;
}
static String normalizePath(String src) {

View File

@ -1194,7 +1194,9 @@ public void waitClusterUp() throws IOException {
} catch (InterruptedException e) {
}
if (++i > 10) {
throw new IOException("Timed out waiting for Mini HDFS Cluster to start");
final String msg = "Timed out waiting for Mini HDFS Cluster to start";
LOG.error(msg);
throw new IOException(msg);
}
}
}

View File

@ -1122,4 +1122,31 @@ public void testHANNRestartAfterSnapshotDeletion() throws Exception {
// wait till the cluster becomes active
cluster.waitClusterUp();
}
@Test
public void testCorrectNumberOfBlocksAfterRestart() throws IOException {
final Path foo = new Path("/foo");
final Path bar = new Path(foo, "bar");
final Path file = new Path(foo, "file");
final String snapshotName = "ss0";
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
hdfs.mkdirs(bar);
hdfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
hdfs.setQuota(bar, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
hdfs.allowSnapshot(foo);
hdfs.createSnapshot(foo, snapshotName);
hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
hdfs.saveNamespace();
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
hdfs.deleteSnapshot(foo, snapshotName);
hdfs.delete(bar, true);
hdfs.delete(foo, true);
long numberOfBlocks = cluster.getNamesystem().getBlocksTotal();
cluster.restartNameNode(0);
assertEquals(numberOfBlocks, cluster.getNamesystem().getBlocksTotal());
}
}