HDFS-4092. Update file deletion logic for snapshot so that the current inode is removed from the circular linked list; and if some blocks at the end of the block list no longer belong to any other inode, collect them and update the block list.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1402287 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-25 19:32:26 +00:00
parent 40d2b6f308
commit 719279ea8a
7 changed files with 94 additions and 19 deletions

View File

@ -28,3 +28,8 @@ Branch-2802 Snapshot (Unreleased)
(szetszwo)
HDFS-4097. Provide CLI support for createSnapshot. (Brandon Li via suresh)
HDFS-4092. Update file deletion logic for snapshot so that the current inode
is removed from the circular linked list; and if some blocks at the end of the
block list no longer belong to any other inode, collect them and update the
block list. (szetszwo)

View File

@ -307,7 +307,7 @@ public class FSDirectory implements Closeable {
) throws IOException, QuotaExceededException {
waitForReady();
final INodeFile src = rootDir.getINodeFile(srcPath);
final INodeFile src = INodeFile.valueOf(rootDir.getNode(srcPath, false), srcPath);
INodeFileSnapshot snapshot = new INodeFileSnapshot(src, src.computeFileSize(true));
writeLock();

View File

@ -170,6 +170,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1398,6 +1399,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is empty");
}
if (trgInode instanceof INodeFileWithLink) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is in a snapshot");
}
long blockSize = trgInode.getPreferredBlockSize();

View File

@ -148,21 +148,6 @@ public class INodeDirectory extends INode {
return getNode(getPathComponents(path), resolveLink);
}
/** @return the INodeFile corresponding to the path. */
INodeFile getINodeFile(String path) throws FileNotFoundException,
UnresolvedLinkException {
final INode inode = getNode(path, false);
if (inode == null) {
throw new FileNotFoundException("File \"" + path
+ "\" not found");
}
if (!(inode instanceof INodeFile)) {
throw new FileNotFoundException("Path \"" + path
+ "\" is not a file");
}
return (INodeFile)inode;
}
/**
* Retrieve existing INodes from a path. If existing is big enough to store
* all path components (existing and non-existing), then existing INodes

View File

@ -55,7 +55,7 @@ public class INodeFile extends INode implements BlockCollection {
private long header;
BlockInfo blocks[] = null;
protected BlockInfo[] blocks = null;
INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
short replication, long modificationTime,
@ -162,7 +162,7 @@ public class INodeFile extends INode implements BlockCollection {
}
@Override
int collectSubtreeBlocksAndClear(List<Block> v) {
protected int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null;
if(blocks != null && v != null) {
for (BlockInfo blk : blocks) {
@ -192,7 +192,7 @@ public class INodeFile extends INode implements BlockCollection {
/** Compute file size.
* May or may not include BlockInfoUnderConstruction.
*/
long computeFileSize(boolean includesBlockInfoUnderConstruction) {
protected long computeFileSize(boolean includesBlockInfoUnderConstruction) {
if (blocks == null || blocks.length == 0) {
return 0;
}

View File

@ -32,4 +32,11 @@ public class INodeFileSnapshot extends INodeFileWithLink {
super(f);
this.size = size;
}
@Override
protected long computeFileSize(boolean includesBlockInfoUnderConstruction) {
//ignore includesBlockInfoUnderConstruction
//since files in a snapshot are considered as closed.
return size;
}
}

View File

@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
/**
@ -65,4 +69,73 @@ public class INodeFileWithLink extends INodeFile {
}
return max;
}
/**
* {@inheritDoc}
*
* Remove the current inode from the circular linked list.
* If some blocks at the end of the block list no longer belongs to
* any other inode, collect them and update the block list.
*/
@Override
protected int collectSubtreeBlocksAndClear(List<Block> v) {
if (next == this) {
//this is the only remaining inode.
super.collectSubtreeBlocksAndClear(v);
} else {
//There are other inode(s) using the blocks.
//Compute max file size excluding this and find the last inode.
long max = next.computeFileSize(true);
INodeFileWithLink last = next;
for(INodeFileWithLink i = next.getNext(); i != this; i = i.getNext()) {
final long size = i.computeFileSize(true);
if (size > max) {
max = size;
}
last = i;
}
collectBlocksBeyondMaxAndClear(max, v);
//remove this from the circular linked list.
last.next = this.next;
this.next = null;
//clear parent
parent = null;
}
return 1;
}
private void collectBlocksBeyondMaxAndClear(final long max, final List<Block> v) {
if (blocks != null) {
//find the minimum n such that the size of the first n blocks > max
int n = 0;
for(long size = 0; n < blocks.length && max > size; n++) {
size += blocks[n].getNumBytes();
}
//starting from block[n], the data is beyond max.
if (n < blocks.length) {
//resize the array.
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = null;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(blocks, 0, newBlocks, 0, n);
}
for(INodeFileWithLink i = next; i != this; i = i.getNext()) {
i.blocks = newBlocks;
}
//collect the blocks beyond max.
if (v != null) {
for(; n < blocks.length; n++) {
v.add(blocks[n]);
}
}
}
blocks = null;
}
}
}