HDFS-4077. Add support for Snapshottable Directory. Contributed by Nicholas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1400318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-10-19 22:21:50 +00:00
parent f5c4defcb3
commit af130d4baf
5 changed files with 96 additions and 28 deletions

View File

@ -9,3 +9,5 @@ Branch-2802 Snapshot (Unreleased)
directory. (Brandon Li via suresh)
HDFS-4083. Protocol changes for snapshots. (suresh)
HDFS-4077. Add support for Snapshottable Directory. (Nicholas via suresh)

View File

@ -1151,27 +1151,51 @@ public class FSDirectory implements Closeable {
}
/**
* Replaces the specified inode with the specified one.
* Replaces the specified INode.
*/
public void replaceNode(String path, INodeFile oldnode, INodeFile newnode)
throws IOException, UnresolvedLinkException {
private void replaceINodeUnsynced(String path, INode oldnode, INode newnode
) throws IOException {
//remove the old node from the namespace
if (!oldnode.removeNode()) {
final String mess = "FSDirectory.replaceINodeUnsynced: failed to remove "
+ path;
NameNode.stateChangeLog.warn("DIR* " + mess);
throw new IOException(mess);
}
//add the new node
rootDir.addNode(path, newnode);
}
/**
* Replaces the specified INodeDirectory.
*/
public void replaceINodeDirectory(String path, INodeDirectory oldnode,
INodeDirectory newnode) throws IOException {
writeLock();
try {
//
// Remove the node from the namespace
//
if (!oldnode.removeNode()) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
"failed to remove " + path);
throw new IOException("FSDirectory.replaceNode: " +
"failed to remove " + path);
}
/* Currently oldnode and newnode are assumed to contain the same
* blocks. Otherwise, blocks need to be removed from the blocksMap.
*/
rootDir.addNode(path, newnode);
replaceINodeUnsynced(path, oldnode, newnode);
//update children's parent directory
for(INode i : newnode.getChildren()) {
i.parent = newnode;
}
} finally {
writeUnlock();
}
}
/**
* Replaces the specified INodeFile with the specified one.
*/
public void replaceNode(String path, INodeFile oldnode, INodeFile newnode
) throws IOException {
writeLock();
try {
replaceINodeUnsynced(path, oldnode, newnode);
//Currently, oldnode and newnode are assumed to contain the same blocks.
//Otherwise, blocks need to be removed from the blocksMap.
int index = 0;
for (BlockInfo b : newnode.getBlocks()) {
BlockInfo info = getBlockManager().addBlockCollection(b, newnode);

View File

@ -17,20 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -2961,7 +2962,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
getEditLog().logSync();
}
/**
* Set the given directory as a snapshottable directory.
* If the path is already a snapshottable directory, this is a no-op.
* Otherwise, the {@link INodeDirectory} of the path is replaced by an
* {@link INodeDirectorySnapshottable}.
*/
void setSnapshottable(final String path) throws IOException {
writeLock();
try {
final INodeDirectory d = INodeDirectory.valueOf(dir.getINode(path), path);
if (d.isSnapshottable()) {
//The directory is already a snapshottable directory.
return;
}
final INodeDirectorySnapshottable s
= INodeDirectorySnapshottable.newInstance(d);
dir.replaceINodeDirectory(path, d, s);
} finally {
writeUnlock();
}
}
/** Persist all metadata about this file.
* @param src The string representation of the path
* @param clientName The string representation of the client

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -31,10 +32,22 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
/**
* Directory INode class.
*/
class INodeDirectory extends INode {
public class INodeDirectory extends INode {
protected static final int DEFAULT_FILES_PER_DIRECTORY = 5;
final static String ROOT_NAME = "";
/** Cast INode to INodeDirectory. */
public static INodeDirectory valueOf(INode inode, String src
) throws IOException {
if (inode == null) {
throw new FileNotFoundException(src + " does not exist.");
}
if (!inode.isDirectory()) {
throw new IOException(src + " is not a directory.");
}
return (INodeDirectory)inode;
}
private List<INode> children;
INodeDirectory(String name, PermissionStatus permissions) {
@ -70,6 +83,11 @@ class INodeDirectory extends INode {
return true;
}
/** Is this a snapshottable directory? */
public boolean isSnapshottable() {
return false;
}
INode removeChild(INode node) {
assert children != null;
int low = Collections.binarySearch(children, node.name);

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
/**
* Directory INode class that has a quota restriction
*/
class INodeDirectoryWithQuota extends INodeDirectory {
public class INodeDirectoryWithQuota extends INodeDirectory {
private long nsQuota; /// NameSpace quota
private long nsCount;
private long dsQuota; /// disk space quota
@ -37,8 +37,8 @@ class INodeDirectoryWithQuota extends INodeDirectory {
* @param dsQuota Diskspace quota to be assigned to this indoe
* @param other The other inode from which all other properties are copied
*/
INodeDirectoryWithQuota(long nsQuota, long dsQuota, INodeDirectory other)
throws QuotaExceededException {
protected INodeDirectoryWithQuota(long nsQuota, long dsQuota,
INodeDirectory other) {
super(other);
INode.DirCounts counts = new INode.DirCounts();
other.spaceConsumedInTree(counts);
@ -72,7 +72,7 @@ class INodeDirectoryWithQuota extends INodeDirectory {
* @return this directory's namespace quota
*/
@Override
long getNsQuota() {
public long getNsQuota() {
return nsQuota;
}
@ -80,7 +80,7 @@ class INodeDirectoryWithQuota extends INodeDirectory {
* @return this directory's diskspace quota
*/
@Override
long getDsQuota() {
public long getDsQuota() {
return dsQuota;
}