From 0fa5cad0b27780c27a284c23101b1099d4886506 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 24 Apr 2013 20:31:06 +0000 Subject: [PATCH] HDFS-4686. Update quota computation for rename and INodeReference. Contributed by Jing Zhao git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1471647 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-2802.txt | 3 + .../hdfs/server/namenode/FSDirectory.java | 61 +++++- .../hadoop/hdfs/server/namenode/FSImage.java | 6 +- .../hdfs/server/namenode/FSImageFormat.java | 13 +- .../server/namenode/FSImageSerialization.java | 2 + .../hadoop/hdfs/server/namenode/INode.java | 48 ++++- .../hdfs/server/namenode/INodeDirectory.java | 30 ++- .../namenode/INodeDirectoryWithQuota.java | 12 +- .../hdfs/server/namenode/INodeFile.java | 35 ++- .../hdfs/server/namenode/INodeReference.java | 149 +++++++++++-- .../hdfs/server/namenode/INodeSymlink.java | 2 +- .../snapshot/AbstractINodeDiffList.java | 27 ++- .../snapshot/INodeDirectorySnapshottable.java | 4 +- .../snapshot/INodeDirectoryWithSnapshot.java | 202 ++++++++++++++---- .../server/namenode/snapshot/Snapshot.java | 13 +- .../snapshot/SnapshotFSImageFormat.java | 1 - .../snapshot/TestRenameWithSnapshots.java | 90 +++++++- 17 files changed, 561 insertions(+), 137 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt index e9291cece3c..24a161bd022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt @@ -293,3 +293,6 @@ Branch-2802 Snapshot (Unreleased) HDFS-4738. Changes AbstractINodeDiff to implement Comparable, and fix javadoc and other warnings. (szetszwo) + + HDFS-4686. Update quota computation for rename and INodeReference. + (Jing Zhao via szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1f214d7858c..c72574d349b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -614,14 +614,18 @@ public class FSDirectory implements Closeable { // check srcChild for reference final INodeReference.WithCount withCount; + Quota.Counts oldSrcCounts = Quota.Counts.newInstance(); int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference() .getDstSnapshotId() : Snapshot.INVALID_ID; if (isSrcInSnapshot) { final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory() - .replaceChild4ReferenceWithName(srcChild); + .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); withCount = (INodeReference.WithCount) withName.getReferredINode(); srcChild = withName; srcIIP.setLastINode(srcChild); + // get the counts before rename + withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true, + Snapshot.INVALID_ID); } else if (srcChildIsReference) { // srcChild is reference but srcChild is not in latest snapshot withCount = (WithCount) srcChild.asReference().getReferredINode(); @@ -661,8 +665,6 @@ public class FSDirectory implements Closeable { final INodeReference.DstReference ref = new INodeReference.DstReference( dstParent.asDirectory(), withCount, dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId()); - withCount.setParentReference(ref); - withCount.incrementReferenceCount(); toDst = ref; } @@ -677,8 +679,18 @@ public class FSDirectory implements Closeable { srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot()); dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot()); // update moved leases with new filename - getFSNamesystem().unprotectedChangeLease(src, dst); + getFSNamesystem().unprotectedChangeLease(src, dst); + // update the quota usage in src tree + if (isSrcInSnapshot) { + // get the counts after rename + Quota.Counts newSrcCounts = srcChild.computeQuotaUsage( + Quota.Counts.newInstance(), false, Snapshot.INVALID_ID); + newSrcCounts.subtract(oldSrcCounts); + srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE), + newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID); + } + return true; } } finally { @@ -689,16 +701,21 @@ public class FSDirectory implements Closeable { if (withCount == null) { srcChild.setLocalName(srcChildName); } else if (!srcChildIsReference) { // src must be in snapshot + // the withCount node will no longer be used thus no need to update + // its reference number here final INode originalChild = withCount.getReferredINode(); srcChild = originalChild; } else { + withCount.removeReference(oldSrcChild.asReference()); final INodeReference originalRef = new INodeReference.DstReference( srcParent, withCount, srcRefDstSnapshot); - withCount.setParentReference(originalRef); srcChild = originalRef; } if (isSrcInSnapshot) { + // srcParent must be an INodeDirectoryWithSnapshot instance since + // isSrcInSnapshot is true and src node has been removed from + // srcParent ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent( oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot()); } else { @@ -849,12 +866,16 @@ public class FSDirectory implements Closeable { final INodeReference.WithCount withCount; int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference() .getDstSnapshotId() : Snapshot.INVALID_ID; + Quota.Counts oldSrcCounts = Quota.Counts.newInstance(); if (isSrcInSnapshot) { final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory() - .replaceChild4ReferenceWithName(srcChild); + .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); withCount = (INodeReference.WithCount) withName.getReferredINode(); srcChild = withName; srcIIP.setLastINode(srcChild); + // get the counts before rename + withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true, + Snapshot.INVALID_ID); } else if (srcChildIsReference) { // srcChild is reference but srcChild is not in latest snapshot withCount = (WithCount) srcChild.asReference().getReferredINode(); @@ -902,8 +923,6 @@ public class FSDirectory implements Closeable { final INodeReference.DstReference ref = new INodeReference.DstReference( dstIIP.getINode(-2).asDirectory(), withCount, dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId()); - withCount.setParentReference(ref); - withCount.incrementReferenceCount(); toDst = ref; } @@ -941,6 +960,17 @@ public class FSDirectory implements Closeable { // deleted. Need to update the SnapshotManager. namesystem.removeSnapshottableDirs(snapshottableDirs); } + + // update the quota usage in src tree + if (isSrcInSnapshot) { + // get the counts after rename + Quota.Counts newSrcCounts = srcChild.computeQuotaUsage( + Quota.Counts.newInstance(), false, Snapshot.INVALID_ID); + newSrcCounts.subtract(oldSrcCounts); + srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE), + newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID); + } + return filesDeleted >= 0; } } finally { @@ -952,12 +982,14 @@ public class FSDirectory implements Closeable { if (withCount == null) { srcChild.setLocalName(srcChildName); } else if (!srcChildIsReference) { // src must be in snapshot + // the withCount node will no longer be used thus no need to update + // its reference number here final INode originalChild = withCount.getReferredINode(); srcChild = originalChild; } else { + withCount.removeReference(oldSrcChild.asReference()); final INodeReference originalRef = new INodeReference.DstReference( srcParent, withCount, srcRefDstSnapshot); - withCount.setParentReference(originalRef); srcChild = originalRef; } @@ -978,6 +1010,12 @@ public class FSDirectory implements Closeable { } else { addLastINodeNoQuotaCheck(dstIIP, removedDst); } + if (removedDst.isReference()) { + final INodeReference removedDstRef = removedDst.asReference(); + final INodeReference.WithCount wc = + (WithCount) removedDstRef.getReferredINode().asReference(); + wc.addReference(removedDstRef); + } } } NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " @@ -1368,7 +1406,7 @@ public class FSDirectory implements Closeable { Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot, collectedBlocks, removedINodes); parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE), - -counts.get(Quota.DISKSPACE), true); + -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID); removed = counts.get(Quota.NAMESPACE); } if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -2588,7 +2626,8 @@ public class FSDirectory implements Closeable { } @Override - public Counts computeQuotaUsage(Counts counts, boolean useCache) { + public Counts computeQuotaUsage(Counts counts, boolean useCache, + int lastSnapshotId) { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index a636ea40151..66433b91541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -749,16 +749,16 @@ public class FSImage implements Closeable { * throw QuotaExceededException. */ static void updateCountForQuota(INodeDirectoryWithQuota root) { - updateCountForQuotaRecursively(root, new Quota.Counts()); + updateCountForQuotaRecursively(root, Quota.Counts.newInstance()); } private static void updateCountForQuotaRecursively(INodeDirectory dir, Quota.Counts counts) { final long parentNamespace = counts.get(Quota.NAMESPACE); final long parentDiskspace = counts.get(Quota.DISKSPACE); - - dir.computeQuotaUsage4CurrentDirectory(counts); + dir.computeQuotaUsage4CurrentDirectory(counts); + for (INode child : dir.getChildrenList(null)) { if (child.isDirectory()) { updateCountForQuotaRecursively(child.asDirectory(), counts); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 0b5bfa0c092..8103971d7ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -674,19 +674,18 @@ public class FSImageFormat { //reference final boolean isWithName = in.readBoolean(); - int dstSnapshotId = Snapshot.INVALID_ID; - if (!isWithName) { - dstSnapshotId = in.readInt(); - } + // lastSnapshotId for WithName node, dstSnapshotId for DstReference node + int snapshotId = in.readInt(); + final INodeReference.WithCount withCount = referenceMap.loadINodeReferenceWithCount(isSnapshotINode, in, this); if (isWithName) { - return new INodeReference.WithName(null, withCount, localName); + return new INodeReference.WithName(null, withCount, localName, + snapshotId); } else { final INodeReference ref = new INodeReference.DstReference(null, - withCount, dstSnapshotId); - withCount.setParentReference(ref); + withCount, snapshotId); return ref; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 48aa9c6bbc0..63800f4803a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -270,6 +270,8 @@ public class FSImageSerialization { Preconditions.checkState(ref instanceof INodeReference.DstReference); // dst snapshot id out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId()); + } else { + out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId()); } final INodeReference.WithCount withCount diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 28da7bf1172..2dfb4715bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap.Key; +import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference; +import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; @@ -386,11 +388,17 @@ public abstract class INode implements Diff.Element, LinkedElement { * Check and add namespace/diskspace consumed to itself and the ancestors. * @throws QuotaExceededException if quote is violated. */ - public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify) - throws QuotaExceededException { - final INodeDirectory parentDir = getParent(); - if (parentDir != null) { - parentDir.addSpaceConsumed(nsDelta, dsDelta, verify); + public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify, + int snapshotId) throws QuotaExceededException { + if (parent != null) { + parent.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId); + } + } + + public void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta, + boolean verify, int snapshotId) throws QuotaExceededException { + if (parent != null) { + parent.addSpaceConsumedToRenameSrc(nsDelta, dsDelta, verify, snapshotId); } } @@ -420,11 +428,39 @@ public abstract class INode implements Diff.Element, LinkedElement { /** * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages. * + * With the existence of {@link INodeReference}, the same inode and its + * subtree may be referred by multiple {@link WithName} nodes and a + * {@link DstReference} node. To avoid circles while quota usage computation, + * we have the following rules: + * + *
+   * 1. For a {@link DstReference} node, since the node must be in the current
+   * tree (or has been deleted as the end point of a series of rename 
+   * operations), we compute the quota usage of the referred node (and its 
+   * subtree) in the regular manner, i.e., including every inode in the current
+   * tree and in snapshot copies, as well as the size of diff list.
+   * 
+   * 2. For a {@link WithName} node, since the node must be in a snapshot, we 
+   * only count the quota usage for those nodes that still existed at the 
+   * creation time of the snapshot associated with the {@link WithName} node.
+   * We do not count in the size of the diff list.  
+   * 
+   * 
    * @param counts The subtree counts for returning.
+   * @param useCache Whether to use cached quota usage. Note that 
+   *                 {@link WithName} node never uses cache for its subtree.
+   * @param lastSnapshotId {@link Snapshot#INVALID_ID} indicates the computation
+   *                       is in the current tree. Otherwise the id indicates
+   *                       the computation range for a {@link WithName} node.
    * @return The same objects as the counts parameter.
    */
   public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache);
+      boolean useCache, int lastSnapshotId);
+
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache) {
+    return computeQuotaUsage(counts, useCache, Snapshot.INVALID_ID);
+  }
   
   /**
    * @return null if the local name is null; otherwise, return the local name.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 1574783ede7..a312977cbea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap.Key;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
@@ -204,17 +205,28 @@ public class INodeDirectory extends INodeWithAdditionalFields {
     Preconditions.checkState(i >= 0);
     Preconditions.checkState(oldChild == children.get(i));
     
-    // TODO: the first case may never be hit
     if (oldChild.isReference() && !newChild.isReference()) {
+      // replace the referred inode, e.g., 
+      // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot
+      // TODO: add a unit test for rename + append
       final INode withCount = oldChild.asReference().getReferredINode();
       withCount.asReference().setReferredINode(newChild);
     } else {
+      if (oldChild.isReference()) {
+        // both are reference nodes, e.g., DstReference -> WithName
+        final INodeReference.WithCount withCount = 
+            (WithCount) oldChild.asReference().getReferredINode();
+        withCount.removeReference(oldChild.asReference());
+      }
+      // do the replacement
       final INode removed = children.set(i, newChild);
       Preconditions.checkState(removed == oldChild);
     }
   }
 
-  INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild) {
+  INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild,
+      Snapshot latest) {
+    Preconditions.checkArgument(latest != null);
     if (oldChild instanceof INodeReference.WithName) {
       return (INodeReference.WithName)oldChild;
     }
@@ -227,7 +239,7 @@ public class INodeDirectory extends INodeWithAdditionalFields {
       withCount = new INodeReference.WithCount(null, oldChild);
     }
     final INodeReference.WithName ref = new INodeReference.WithName(this,
-        withCount, oldChild.getLocalNameBytes());
+        withCount, oldChild.getLocalNameBytes(), latest.getId());
     replaceChild(oldChild, ref);
     return ref;
   }
@@ -415,20 +427,20 @@ public class INodeDirectory extends INodeWithAdditionalFields {
   }
 
   @Override
-  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (children != null) {
       for (INode child : children) {
-        child.computeQuotaUsage(counts, useCache);
+        child.computeQuotaUsage(counts, useCache, lastSnapshotId);
       }
     }
-
-    return computeQuotaUsage4CurrentDirectory(counts);    
+    return computeQuotaUsage4CurrentDirectory(counts);
   }
-
+  
   /** Add quota usage for this inode excluding children. */
   public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
     counts.add(Quota.NAMESPACE, 1);
-    return counts;    
+    return counts;
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
index 21918f4eb9c..1148f320aef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
@@ -94,14 +94,14 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
   }
   
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (useCache && isQuotaSet()) {
       // use cache value
       counts.add(Quota.NAMESPACE, namespace);
       counts.add(Quota.DISKSPACE, diskspace);
     } else {
-      super.computeQuotaUsage(counts, false);
+      super.computeQuotaUsage(counts, false, lastSnapshotId);
     }
     return counts;
   }
@@ -141,7 +141,7 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
   
   @Override
   public final void addSpaceConsumed(final long nsDelta, final long dsDelta,
-      boolean verify) throws QuotaExceededException {
+      boolean verify, int snapshotId) throws QuotaExceededException {
     if (isQuotaSet()) { 
       // The following steps are important: 
       // check quotas in this inode and all ancestors before changing counts
@@ -152,11 +152,11 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
         verifyQuota(nsDelta, dsDelta);
       }
       // (2) verify quota and then add count in ancestors 
-      super.addSpaceConsumed(nsDelta, dsDelta, verify);
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
       // (3) add count in this inode
       addSpaceConsumed2Cache(nsDelta, dsDelta);
     } else {
-      super.addSpaceConsumed(nsDelta, dsDelta, verify);
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
     }
   }
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 80711bdd241..54aa9e0c236 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -329,13 +329,32 @@ public class INodeFile extends INodeWithAdditionalFields implements BlockCollect
 
   @Override
   public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache) {
-    counts.add(Quota.NAMESPACE, this instanceof FileWithSnapshot?
-        ((FileWithSnapshot)this).getDiffs().asList().size() + 1: 1);
-    counts.add(Quota.DISKSPACE, diskspaceConsumed());
+      boolean useCache, int lastSnapshotId) {
+    long nsDelta = 1;
+    final long dsDelta;
+    if (this instanceof FileWithSnapshot) {
+      FileDiffList fileDiffList = ((FileWithSnapshot) this).getDiffs();
+      Snapshot last = fileDiffList.getLastSnapshot();
+      List diffs = fileDiffList.asList();
+
+      if (lastSnapshotId == Snapshot.INVALID_ID || last == null) {
+        nsDelta += diffs.size();
+        dsDelta = diskspaceConsumed();
+      } else if (last.getId() < lastSnapshotId) {
+        dsDelta = computeFileSize(true, false) * getFileReplication();
+      } else {
+        Snapshot s = fileDiffList.searchSnapshotById(lastSnapshotId);
+        dsDelta = diskspaceConsumed(s);      
+      }
+    } else {
+      dsDelta = diskspaceConsumed();
+    }
+    counts.add(Quota.NAMESPACE, nsDelta);
+    counts.add(Quota.DISKSPACE, dsDelta);
     return counts;
   }
 
+
   @Override
   public final Content.CountsMap computeContentSummary(
       final Content.CountsMap countsMap) {
@@ -448,6 +467,14 @@ public class INodeFile extends INodeWithAdditionalFields implements BlockCollect
     // use preferred block size for the last block if it is under construction
     return computeFileSize(true, true) * getBlockReplication();
   }
+
+  public final long diskspaceConsumed(Snapshot lastSnapshot) {
+    if (lastSnapshot != null) {
+      return computeFileSize(lastSnapshot) * getFileReplication(lastSnapshot);
+    } else {
+      return diskspaceConsumed();
+    }
+  }
   
   /**
    * Return the penultimate allocated block for this file.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
index 744336a6e94..428194e8276 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -76,11 +80,10 @@ public abstract class INodeReference extends INode {
     if (!(referred instanceof WithCount)) {
       return -1;
     }
+    
     WithCount wc = (WithCount) referred;
-    if (ref == wc.getParentReference()) {
-      wc.setParent(null);
-    }
-    return ((WithCount)referred).decrementReferenceCount();
+    wc.removeReference(ref);
+    return wc.getReferenceCount();
   }
 
   private INode referred;
@@ -222,7 +225,7 @@ public abstract class INodeReference extends INode {
   }
 
   @Override
-  public final Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+  public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
       BlocksMapUpdateInfo collectedBlocks, final List removedINodes)
       throws QuotaExceededException {
     return referred.cleanSubtree(snapshot, prior, collectedBlocks,
@@ -248,8 +251,9 @@ public abstract class INodeReference extends INode {
   }
 
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
-    return referred.computeQuotaUsage(counts, useCache);
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
+    return referred.computeQuotaUsage(counts, useCache, lastSnapshotId);
   }
   
   @Override
@@ -257,12 +261,6 @@ public abstract class INodeReference extends INode {
     return referred.getSnapshotINode(snapshot);
   }
 
-  @Override
-  public final void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify)
-      throws QuotaExceededException {
-    referred.addSpaceConsumed(nsDelta, dsDelta, verify);
-  }
-
   @Override
   public final long getNsQuota() {
     return referred.getNsQuota();
@@ -302,10 +300,11 @@ public abstract class INodeReference extends INode {
   public int getDstSnapshotId() {
     return Snapshot.INVALID_ID;
   }
-
+  
   /** An anonymous reference with reference count. */
   public static class WithCount extends INodeReference {
-    private int referenceCount = 1;
+    
+    private final List withNameList = new ArrayList();
     
     public WithCount(INodeReference parent, INode referred) {
       super(parent, referred);
@@ -313,30 +312,88 @@ public abstract class INodeReference extends INode {
       referred.setParentReference(this);
     }
     
-    /** @return the reference count. */
     public int getReferenceCount() {
-      return referenceCount;
+      int count = withNameList.size();
+      if (getParentReference() != null) {
+        count++;
+      }
+      return count;
     }
 
     /** Increment and then return the reference count. */
-    public int incrementReferenceCount() {
-      return ++referenceCount;
+    public void addReference(INodeReference ref) {
+      if (ref instanceof WithName) {
+        withNameList.add((WithName) ref);
+      } else if (ref instanceof DstReference) {
+        setParentReference(ref);
+      }
     }
 
     /** Decrement and then return the reference count. */
-    public int decrementReferenceCount() {
-      return --referenceCount;
+    public void removeReference(INodeReference ref) {
+      if (ref instanceof WithName) {
+        Iterator iter = withNameList.iterator();
+        while (iter.hasNext()) {
+          if (iter.next() == ref) {
+            iter.remove();
+            break;
+          }
+        }
+      } else if (ref == getParentReference()) {
+        setParent(null);
+      }
+    }
+    
+    @Override
+    public final void addSpaceConsumed(long nsDelta, long dsDelta,
+        boolean verify, int snapshotId) throws QuotaExceededException {
+      INodeReference parentRef = getParentReference();
+      if (parentRef != null) {
+        parentRef.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+      }
+      addSpaceConsumedToRenameSrc(nsDelta, dsDelta, verify, snapshotId);
+    }
+    
+    @Override
+    public final void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta,
+        boolean verify, int snapshotId) throws QuotaExceededException {
+      if (snapshotId != Snapshot.INVALID_ID) {
+        // sort withNameList based on the lastSnapshotId
+        Collections.sort(withNameList, new Comparator() {
+          @Override
+          public int compare(WithName w1, WithName w2) {
+            return w1.lastSnapshotId - w2.lastSnapshotId;
+          }
+        });
+        for (INodeReference.WithName withName : withNameList) {
+          if (withName.getLastSnapshotId() >= snapshotId) {
+            withName.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+            break;
+          }
+        }
+      }
     }
   }
-
+  
   /** A reference with a fixed name. */
   public static class WithName extends INodeReference {
 
     private final byte[] name;
 
-    public WithName(INodeDirectory parent, WithCount referred, byte[] name) {
+    /**
+     * The id of the last snapshot in the src tree when this WithName node was 
+     * generated. When calculating the quota usage of the referred node, only 
+     * the files/dirs existing when this snapshot was taken will be counted for 
+     * this WithName node and propagated along its ancestor path.
+     */
+    private final int lastSnapshotId;
+    
+    public WithName(INodeDirectory parent, WithCount referred, byte[] name,
+        int lastSnapshotId) {
       super(parent, referred);
       this.name = name;
+      this.lastSnapshotId = lastSnapshotId;
+      referred.addReference(this);
     }
 
     @Override
@@ -349,6 +406,36 @@ public abstract class INodeReference extends INode {
       throw new UnsupportedOperationException("Cannot set name: " + getClass()
           + " is immutable.");
     }
+    
+    public int getLastSnapshotId() {
+      return lastSnapshotId;
+    }
+    
+    @Override
+    public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+        boolean useCache, int lastSnapshotId) {
+      Preconditions.checkState(lastSnapshotId == Snapshot.INVALID_ID
+          || this.lastSnapshotId <= lastSnapshotId);
+      final INode referred = this.getReferredINode().asReference()
+          .getReferredINode();
+      // we cannot use cache for the referred node since its cached quota may
+      // have already been updated by changes in the current tree
+      return referred.computeQuotaUsage(counts, false, this.lastSnapshotId);
+    }
+    
+    @Override
+    public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+        BlocksMapUpdateInfo collectedBlocks, List removedINodes)
+        throws QuotaExceededException {
+      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
+          collectedBlocks, removedINodes);
+      INodeReference ref = getReferredINode().getParentReference();
+      if (ref != null) {
+        ref.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+            -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+      }
+      return counts;
+    }
   }
   
   public static class DstReference extends INodeReference {
@@ -373,6 +460,22 @@ public abstract class INodeReference extends INode {
         final int dstSnapshotId) {
       super(parent, referred);
       this.dstSnapshotId = dstSnapshotId;
+      referred.addReference(this);
+    }
+    
+    @Override
+    public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+        BlocksMapUpdateInfo collectedBlocks, List removedINodes)
+        throws QuotaExceededException {
+      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
+          collectedBlocks, removedINodes);
+      if (snapshot != null) {
+        // also need to update quota usage along the corresponding WithName node
+        WithCount wc = (WithCount) getReferredINode();
+        wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
+            -counts.get(Quota.DISKSPACE), true, snapshot.getId());
+      }
+      return counts;
     }
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
index b58691e6190..aee1533db40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
@@ -89,7 +89,7 @@ public class INodeSymlink extends INodeWithAdditionalFields {
 
   @Override
   public Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean updateCache) {
+      boolean updateCache, int lastSnapshotId) {
     counts.add(Quota.NAMESPACE, 1);
     return counts;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
index b9be1d57f2c..b0cc05e6395 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
@@ -68,7 +68,8 @@ abstract class AbstractINodeDiffList removedINodes) {
+      final BlocksMapUpdateInfo collectedBlocks, final List removedINodes)
+      throws QuotaExceededException {
     int snapshotIndex = Collections.binarySearch(diffs, snapshot.getId());
     
     Quota.Counts counts = Quota.Counts.newInstance();
@@ -80,6 +81,13 @@ abstract class AbstractINodeDiffList deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
-        if (INodeReference.tryRemoveReference(d) <= 0) {
-          d.computeQuotaUsage(counts, false);
-          d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-        } else {
-          refNodes.add(d.asReference());
+        d.computeQuotaUsage(counts, false);
+        d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        if (d.isReference()) {
+          INodeReference.WithCount wc = 
+              (INodeReference.WithCount) d.asReference().getReferredINode();
+          if (wc.getReferenceCount() > 0) {
+            refNodes.add(d.asReference());
+          }
         }
       }
       deletedList.clear();
@@ -271,25 +275,36 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         @Override
         public void process(INode inode) {
           if (inode != null) {
-            if (INodeReference.tryRemoveReference(inode) <= 0) {
-              inode.computeQuotaUsage(counts, false);
-              inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-            } else {
-              // if the node is a reference node, we should continue the 
-              // snapshot deletion process
-              try {
-                // use null as prior here because we are handling a reference
-                // node stored in the created list of a snapshot diff. This 
-                // snapshot diff must be associated with the latest snapshot of
-                // the dst tree before the rename operation. In this scenario,
-                // the prior snapshot should be the one created in the src tree,
-                // and it can be identified by the cleanSubtree since we call
-                // recordModification before the rename.
-                counts.add(inode.cleanSubtree(posterior.snapshot, null,
-                    collectedBlocks, removedINodes));
-              } catch (QuotaExceededException e) {
-                String error = "should not have QuotaExceededException while deleting snapshot";
-                LOG.error(error, e);
+            inode.computeQuotaUsage(counts, false);
+            inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+
+            boolean handleRef = false;
+            if (inode.isReference()) {
+              INodeReference.WithCount wc = (INodeReference.WithCount) inode
+                  .asReference().getReferredINode();
+              if (wc.getReferenceCount() > 0) {
+                handleRef = true;
+              }
+            }
+
+            if (handleRef) {
+              final Snapshot postSnapshot = posterior.snapshot;
+              if (inode instanceof INodeReference.DstReference) {
+                // we are handling a reference node and its subtree stored in
+                // the created list of a snapshot diff, which must be associated
+                // with the latest snapshot of the dst tree before the rename 
+                // operation.
+                destroyDstSnapshot(inode, postSnapshot, null, collectedBlocks,
+                    removedINodes);
+              } else if (inode instanceof INodeReference.WithName) {
+                // the inode should be renamed again. We only need to delete
+                // postSnapshot in its subtree.
+                try {
+                  inode.cleanSubtree(postSnapshot, null, collectedBlocks,
+                      removedINodes);
+                } catch (QuotaExceededException e) {
+                  LOG.error("Error: should not throw QuotaExceededException", e);
+                }
               }
             }
           }
@@ -297,6 +312,74 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       });
       return counts;
     }
+    
+    /**
+     * For a reference node, delete its first snapshot associated with the dst
+     * tree of a rename operation, i.e., the snapshot diff is associated with
+     * the latest snapshot of the dst tree before the rename operation. The
+     * difference between this process and a regular snapshot deletion
+     * process is that we need to delete everything created after the rename,
+     * i.e., we should destroy the whole created list of the referred node.
+     */
+    private Quota.Counts destroyDstSnapshot(INode inode, final Snapshot snapshot, 
+        Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+        final List removedINodes) {
+      Quota.Counts counts = Quota.Counts.newInstance();
+      try {
+        if (inode.isReference()) {
+          INodeReference referenceNode = inode.asReference(); 
+          INodeReference.WithCount wc = 
+              (WithCount) referenceNode.getReferredINode();
+          INode referred = wc.getReferredINode();
+          Quota.Counts subCounts = destroyDstSnapshot(referred, snapshot,
+              prior, collectedBlocks, removedINodes);
+          if (inode instanceof INodeReference.WithName) {
+            INodeReference ref = wc.getParentReference();
+            if (ref != null) {
+              ref.addSpaceConsumed(-subCounts.get(Quota.NAMESPACE),
+                  -subCounts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+            }
+          } else if (inode instanceof INodeReference.DstReference) {
+            wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
+                -counts.get(Quota.DISKSPACE), true, snapshot.getId());
+          }
+        } else if (inode.isFile()) { // file and not reference
+          counts.add(inode.cleanSubtree(snapshot, null, collectedBlocks,
+              removedINodes));
+        } else if (inode.isDirectory()) { // directory and not reference
+          if (inode.asDirectory() instanceof INodeDirectoryWithSnapshot) {
+            INodeDirectoryWithSnapshot dirNode = 
+                (INodeDirectoryWithSnapshot) inode.asDirectory();
+            DirectoryDiffList diffList = dirNode.getDiffs();
+            prior = diffList.updatePrior(snapshot, prior);
+            counts.add(diffList.deleteSnapshotDiff(snapshot, prior, dirNode, 
+                collectedBlocks, removedINodes));
+            if (prior != null) {
+              DirectoryDiff priorDiff = diffList.getDiff(prior);
+              if (priorDiff != null) {
+                // destroy everything in the created list!
+                counts.add(priorDiff.diff.destroyCreatedList(dirNode,
+                    collectedBlocks, removedINodes));
+                for (INode dNode : priorDiff.getChildrenDiff().getList(
+                    ListType.DELETED)) {
+                  counts.add(cleanDeletedINode(dNode, snapshot, prior,
+                      collectedBlocks, removedINodes));
+                }
+              }
+            }
+          }
+          Snapshot s = snapshot != null && prior != null ? prior : snapshot;
+          for (INode child : inode.asDirectory().getChildrenList(s)) {
+            counts.add(destroyDstSnapshot(child, s, prior, collectedBlocks,
+                removedINodes));
+          }
+        }
+      } catch (QuotaExceededException e) {
+        String error = "should not have QuotaExceededException while deleting snapshot";
+        LOG.error(error, e);
+      }
+      return counts;
+    }
 
     /**
      * @return The children list of a directory in a snapshot.
@@ -395,22 +478,19 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       for (INodeReference ref : refNodes) {
         // if the node is a reference node, we should continue the 
         // snapshot deletion process
-        try {
-          // Use null as prior snapshot. We are handling a reference node stored
-          // in the delete list of this snapshot diff. We need to destroy this 
-          // snapshot diff because it is the very first one in history.
-          // If the ref node is a WithName instance acting as the src node of
-          // the rename operation, there will not be any snapshot before the
-          // snapshot to be deleted. If the ref node presents the dst node of a 
-          // rename operation, we can identify the corresponding prior snapshot 
-          // when we come into the subtree of the ref node.
-          counts.add(ref.cleanSubtree(this.snapshot, null, collectedBlocks,
-              removedINodes));
-        } catch (QuotaExceededException e) {
-          String error = 
-              "should not have QuotaExceededException while deleting snapshot " 
-              + this.snapshot;
-          LOG.error(error, e);
+        if (ref instanceof INodeReference.DstReference) {
+          // if ref is a DstReference instance, we should delete all the files
+          // created after the rename
+          destroyDstSnapshot(ref, snapshot, null, collectedBlocks,
+              removedINodes);
+        } else if (ref instanceof INodeReference.WithName) {
+          // ref should have been renamed again. We only need to delete
+          // the snapshot in its subtree.
+          try {
+            ref.cleanSubtree(snapshot, null, collectedBlocks, removedINodes);
+          } catch (QuotaExceededException e) {
+            LOG.error("Error: should not throw QuotaExceededException", e);
+          }
         }
       }
       return counts;
@@ -689,7 +769,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     if (added && !removeDeletedChild) {
       final Quota.Counts counts = deletedChild.computeQuotaUsage();
       addSpaceConsumed(counts.get(Quota.NAMESPACE),
-          counts.get(Quota.DISKSPACE), false);
+          counts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
     }
   }
 
@@ -796,16 +876,16 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
    * @param collectedBlocks Used to collect blocks for later deletion.
    * @return Quota usage update.
    */
-  private Quota.Counts cleanDeletedINode(INode inode, Snapshot post,
+  private static Quota.Counts cleanDeletedINode(INode inode, Snapshot post,
       Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
-      final List removedINodes) {
+      final List removedINodes) throws QuotaExceededException {
     Quota.Counts counts = Quota.Counts.newInstance();
     Deque queue = new ArrayDeque();
     queue.addLast(inode);
     while (!queue.isEmpty()) {
       INode topNode = queue.pollFirst();
-      if (topNode instanceof FileWithSnapshot) {
-        FileWithSnapshot fs = (FileWithSnapshot) topNode;
+      if (topNode.isFile() && topNode.asFile() instanceof FileWithSnapshot) {
+        FileWithSnapshot fs = (FileWithSnapshot) topNode.asFile();
         counts.add(fs.getDiffs().deleteSnapshotDiff(post, prior,
             topNode.asFile(), collectedBlocks, removedINodes));
       } else if (topNode.isDirectory()) {
@@ -840,13 +920,41 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     super.destroyAndCollectBlocks(collectedBlocks, removedINodes);
   }
 
+  @Override
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache, int lastSnapshotId) {
+    if ((useCache && isQuotaSet()) || lastSnapshotId == Snapshot.INVALID_ID) {
+      return super.computeQuotaUsage(counts, useCache, lastSnapshotId);
+    }
+    
+    final int diffNum = 0;
+    Snapshot lastSnapshot = null;
+    Snapshot lastInDiff = diffs.getLastSnapshot();
+    // if lastSnapshotId > lastInDiff.getId(), the snapshot diff associated with
+    // lastSnapshotId must have been deleted. We should call 
+    // getChildrenList(null) to get the children list for the continuous 
+    // computation. In the meanwhile, there must be some snapshot diff whose
+    // snapshot id is no less than lastSnapshotId. Otherwise the WithName node
+    // itself should have been deleted.
+    if (lastInDiff != null && lastInDiff.getId() >= lastSnapshotId) {
+      lastSnapshot = diffs.searchSnapshotById(lastSnapshotId);
+    }
+    
+    ReadOnlyList childrenList = getChildrenList(lastSnapshot);
+    for (INode child : childrenList) {
+      child.computeQuotaUsage(counts, useCache, lastSnapshotId);
+    }
+    
+    counts.add(Quota.NAMESPACE, diffNum + 1);
+    return counts;
+  }
+  
   @Override
   public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
     super.computeQuotaUsage4CurrentDirectory(counts);
-
     for(DirectoryDiff d : diffs) {
       for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
-        deleted.computeQuotaUsage(counts, false);
+        deleted.computeQuotaUsage(counts, false, Snapshot.INVALID_ID);
       }
     }
     counts.add(Quota.NAMESPACE, diffs.asList().size());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
index cdb2c77ddee..a5830cf26dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
@@ -125,6 +125,13 @@ public class Snapshot implements Comparable {
     }
     return latest;
   }
+  
+  static Snapshot read(DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    final int snapshotId = in.readInt();
+    final INode root = loader.loadINodeWithLocalName(false, in);
+    return new Snapshot(snapshotId, root.asDirectory(), null);
+  }
 
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
@@ -205,10 +212,4 @@ public class Snapshot implements Comparable {
     // write root
     FSImageSerialization.writeINodeDirectory(root, out);
   }
-  
-  static Snapshot read(DataInput in, FSImageFormat.Loader loader) throws IOException {
-    final int snapshotId = in.readInt();
-    final INode root = loader.loadINodeWithLocalName(false, in);
-    return new Snapshot(snapshotId, root.asDirectory(), null);
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
index bbb0c41e7c6..b3a40d6bcbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
@@ -354,7 +354,6 @@ public class SnapshotFSImageFormat {
       } else {
         final long id = in.readLong();
         withCount = referenceMap.get(id);
-        withCount.incrementReferenceCount();
       }
       return withCount;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
index 91a0665e64b..8dad3af8a06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyBoolean;
@@ -61,6 +62,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /** Testing rename with snapshots. */
 public class TestRenameWithSnapshots {
@@ -418,14 +420,16 @@ public class TestRenameWithSnapshots {
     final Path newfoo = new Path(sdir1, "foo");
     hdfs.rename(foo, newfoo);
     
-    final Path bar3 = new Path(newfoo, "bar3");
-    DFSTestUtil.createFile(hdfs, bar3, BLOCKSIZE, REPL, SEED);
+    final Path newbar = new Path(newfoo, bar.getName());
+    final Path newbar2 = new Path(newfoo, bar2.getName());
+    final Path newbar3 = new Path(newfoo, "bar3");
+    DFSTestUtil.createFile(hdfs, newbar3, BLOCKSIZE, REPL, SEED);
     
     hdfs.createSnapshot(sdir1, "s4");
-    hdfs.delete(bar, true);
-    hdfs.delete(bar3, true);
+    hdfs.delete(newbar, true);
+    hdfs.delete(newbar3, true);
     
-    assertFalse(hdfs.exists(bar3));
+    assertFalse(hdfs.exists(newbar3));
     assertFalse(hdfs.exists(bar));
     final Path bar_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
         "foo/bar");
@@ -435,7 +439,7 @@ public class TestRenameWithSnapshots {
     assertTrue(hdfs.exists(bar3_s4));
     
     hdfs.createSnapshot(sdir1, "s5");
-    hdfs.delete(bar2, true);
+    hdfs.delete(newbar2, true);
     assertFalse(hdfs.exists(bar2));
     final Path bar2_s5 = SnapshotTestHelper.getSnapshotPath(sdir1, "s5",
         "foo/bar2");
@@ -527,8 +531,8 @@ public class TestRenameWithSnapshots {
     // dump the namespace loaded from fsimage
     SnapshotTestHelper.dumpTree2File(fsdir, fsnAfter);
     
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, false);
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, false);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, true);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, true);
   }
   
   /**
@@ -832,6 +836,9 @@ public class TestRenameWithSnapshots {
     hdfs.setReplication(bar1_dir2, REPL_1);
     hdfs.setReplication(bar_dir2, REPL_1);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s11");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s22");
@@ -848,6 +855,9 @@ public class TestRenameWithSnapshots {
     hdfs.setReplication(bar1_dir3, REPL_2);
     hdfs.setReplication(bar_dir3, REPL_2);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s111");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s222");
@@ -894,11 +904,14 @@ public class TestRenameWithSnapshots {
     // 3. /dir3/foo -> /dir2/foo
     hdfs.rename(foo_dir3, foo_dir2);
     hdfs.rename(bar_dir3, bar_dir2);
-    
-    // modification on /dir2/foo
+   
+//    // modification on /dir2/foo
     hdfs.setReplication(bar1_dir2, REPL);
     hdfs.setReplication(bar_dir2, REPL);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1111");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2222");
@@ -1386,4 +1399,61 @@ public class TestRenameWithSnapshots {
     assertEquals("s1", fooDiffs.get(0).snapshot.getRoot().getLocalName());
     assertEquals("s3", fooDiffs.get(1).snapshot.getRoot().getLocalName());
   }
+  
+  /**
+   * Test undo where dst node being overwritten is a reference node
+   */
+  @Test
+  public void testRenameUndo_4() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    final Path sdir3 = new Path("/dir3");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    hdfs.mkdirs(sdir3);
+    
+    final Path foo = new Path(sdir1, "foo");
+    final Path bar = new Path(foo, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    
+    final Path foo2 = new Path(sdir2, "foo");
+    hdfs.mkdirs(foo2);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    
+    // rename foo2 to foo3, so that foo3 will be a reference node
+    final Path foo3 = new Path(sdir3, "foo");
+    hdfs.rename(foo2, foo3);
+    
+    INode foo3Node = fsdir.getINode4Write(foo3.toString());
+    assertTrue(foo3Node.isReference());
+    
+    INodeDirectory dir3 = fsdir.getINode4Write(sdir3.toString()).asDirectory();
+    INodeDirectory mockDir3 = spy(dir3);
+    // fail the rename but succeed in undo
+    doReturn(false).when(mockDir3).addChild((INode) Mockito.isNull(),
+        anyBoolean(), (Snapshot) anyObject());
+    Mockito.when(mockDir3.addChild((INode) Mockito.isNotNull(), anyBoolean(),
+        (Snapshot) anyObject())).thenReturn(false).thenCallRealMethod();
+    INodeDirectory root = fsdir.getINode4Write("/").asDirectory();
+    root.replaceChild(dir3, mockDir3);
+    foo3Node.setParent(mockDir3);
+    
+    try {
+      hdfs.rename(foo, foo3, Rename.OVERWRITE);
+      fail("the rename from " + foo + " to " + foo3 + " should fail");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("rename from " + foo + " to "
+          + foo3 + " failed.", e);
+    }
+    
+    // make sure the undo is correct
+    final INode foo3Node_undo = fsdir.getINode4Write(foo3.toString());
+    assertSame(foo3Node, foo3Node_undo);
+    INodeReference.WithCount foo3_wc = (WithCount) foo3Node.asReference()
+        .getReferredINode();
+    assertEquals(2, foo3_wc.getReferenceCount());
+    assertSame(foo3Node, foo3_wc.getParentReference());
+  }
 }