HDFS-4215. Merge r1411947 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471530 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-04-24 16:46:50 +00:00
parent 876c1c9098
commit 19198ea31d
8 changed files with 78 additions and 102 deletions

View File

@ -65,6 +65,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4206. Change the fields in INode and its subclasses to private.
(szetszwo)
HDFS-4215. Remove locking from addToParent(..) since it is used in image
loading, and add INode.isFile(). (szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -74,6 +74,10 @@ import com.google.common.base.Preconditions;
*
*************************************************/
public class FSDirectory implements Closeable {
private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
return new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
namesystem.createFsOwnerPermissions(new FsPermission((short)0755)));
}
INodeDirectoryWithQuota rootDir;
FSImage fsImage;
@ -122,9 +126,7 @@ public class FSDirectory implements Closeable {
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition();
rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
ns.createFsOwnerPermissions(new FsPermission((short)0755)),
Integer.MAX_VALUE, UNKNOWN_DISK_SPACE);
rootDir = createRoot(ns);
this.fsImage = fsImage;
int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
@ -305,35 +307,6 @@ public class FSDirectory implements Closeable {
return newNode;
}
INodeDirectory addToParent(INodeDirectory parentINode,
INode newNode, boolean propagateModTime) {
// NOTE: This does not update space counts for parents
INodeDirectory newParent = null;
writeLock();
try {
try {
newParent = rootDir.addToParent(newNode, parentINode,
propagateModTime);
cacheName(newNode);
} catch (FileNotFoundException e) {
return null;
}
if(newParent == null)
return null;
if(!newNode.isDirectory() && !newNode.isSymlink()) {
// Add file->block mapping
INodeFile newF = (INodeFile)newNode;
BlockInfo[] blocks = newF.getBlocks();
for (int i = 0; i < blocks.length; i++) {
newF.setBlock(i, getBlockManager().addBlockCollection(blocks[i], newF));
}
}
} finally {
writeUnlock();
}
return newParent;
}
/**
* Add a block to the file. Returns a reference to the added block.
*/
@ -825,11 +798,7 @@ public class FSDirectory implements Closeable {
INode[] inodes = rootDir.getExistingPathINodes(src, true);
INode inode = inodes[inodes.length - 1];
if (inode == null) {
return null;
}
assert !inode.isSymlink();
if (inode.isDirectory()) {
if (inode == null || !inode.isFile()) {
return null;
}
INodeFile fileNode = (INodeFile)inode;
@ -848,22 +817,15 @@ public class FSDirectory implements Closeable {
}
/**
* Get the blocksize of a file
* @param filename the filename
* @return the number of bytes
* @param path the file path
* @return the block size of the file.
*/
long getPreferredBlockSize(String filename) throws UnresolvedLinkException,
long getPreferredBlockSize(String path) throws UnresolvedLinkException,
FileNotFoundException, IOException {
readLock();
try {
INode inode = rootDir.getNode(filename, false);
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + filename);
}
if (inode.isDirectory() || inode.isSymlink()) {
throw new IOException("Getting block size of non-file: "+ filename);
}
return ((INodeFile)inode).getPreferredBlockSize();
return INodeFile.valueOf(rootDir.getNode(path, false), path
).getPreferredBlockSize();
} finally {
readUnlock();
}
@ -877,9 +839,7 @@ public class FSDirectory implements Closeable {
if (inode == null) {
return false;
}
return inode.isDirectory() || inode.isSymlink()
? true
: ((INodeFile)inode).getBlocks() != null;
return !inode.isFile() || ((INodeFile)inode).getBlocks() != null;
} finally {
readUnlock();
}
@ -1212,14 +1172,8 @@ public class FSDirectory implements Closeable {
waitForReady();
readLock();
try {
INode targetNode = rootDir.getNode(src, false);
if (targetNode == null)
return null;
if (targetNode.isDirectory())
return null;
if (targetNode.isSymlink())
return null;
return ((INodeFile)targetNode).getBlocks();
final INode i = rootDir.getNode(src, false);
return i != null && i.isFile()? ((INodeFile)i).getBlocks(): null;
} finally {
readUnlock();
}
@ -2023,9 +1977,7 @@ public class FSDirectory implements Closeable {
writeLock();
try {
setReady(false);
rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
getFSNamesystem().createFsOwnerPermissions(new FsPermission((short)0755)),
Integer.MAX_VALUE, -1);
rootDir = createRoot(getFSNamesystem());
nameCache.reset();
} finally {
writeUnlock();
@ -2168,7 +2120,7 @@ public class FSDirectory implements Closeable {
*/
void cacheName(INode inode) {
// Name is cached only for files
if (inode.isDirectory() || inode.isSymlink()) {
if (!inode.isFile()) {
return;
}
ByteArray name = new ByteArray(inode.getLocalNameBytes());

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
@ -252,7 +253,7 @@ class FSImageFormat {
fsDir.rootDir.setQuota(nsQuota, dsQuota);
}
fsDir.rootDir.setModificationTime(root.getModificationTime());
fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());
fsDir.rootDir.clonePermissionStatus(root);
}
/**
@ -308,7 +309,7 @@ class FSImageFormat {
// add to parent
newNode.setLocalName(localName);
namesystem.dir.addToParent(parent, newNode, false);
addToParent(parent, newNode);
}
return numChildren;
}
@ -343,7 +344,30 @@ class FSImageFormat {
// add new inode
newNode.setLocalName(pathComponents[pathComponents.length-1]);
parentINode = fsDir.addToParent(parentINode, newNode, false);
addToParent(parentINode, newNode);
}
}
/**
* Add the child node to parent and, if child is a file, update block map.
* This method is only used for image loading so that synchronization,
* modification time update and space count update are not needed.
*/
void addToParent(INodeDirectory parent, INode child) {
// NOTE: This does not update space counts for parents
if (parent.addChild(child, false) == null) {
return;
}
namesystem.dir.cacheName(child);
if (child.isFile()) {
// Add file->block mapping
final INodeFile file = (INodeFile)child;
final BlockInfo[] blocks = file.getBlocks();
final BlockManager bm = namesystem.getBlockManager();
for (int i = 0; i < blocks.length; i++) {
file.setBlock(i, bm.addBlockCollection(blocks[i], file));
}
}
}

View File

@ -86,17 +86,17 @@ abstract class INode implements Comparable<byte[]> {
return (record & ~MASK) | (bits << OFFSET);
}
/** Set the {@link PermissionStatus} */
/** Encode the {@link PermissionStatus} to a long. */
static long toLong(PermissionStatus ps) {
long permission = 0L;
final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
ps.getUserName());
permission = PermissionStatusFormat.USER.combine(user, permission);
permission = USER.combine(user, permission);
final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
ps.getGroupName());
permission = PermissionStatusFormat.GROUP.combine(group, permission);
permission = GROUP.combine(group, permission);
final int mode = ps.getPermission().toShort();
permission = PermissionStatusFormat.MODE.combine(mode, permission);
permission = MODE.combine(mode, permission);
return permission;
}
}
@ -110,8 +110,9 @@ abstract class INode implements Comparable<byte[]> {
*/
private byte[] name = null;
/**
* Permission encoded using PermissionStatusFormat.
* Codes other than {@link #updatePermissionStatus(PermissionStatusFormat, long)}.
* Permission encoded using {@link PermissionStatusFormat}.
* Codes other than {@link #clonePermissionStatus(INode)}
* and {@link #updatePermissionStatus(PermissionStatusFormat, long)}
* should not modify it.
*/
private long permission = 0L;
@ -155,11 +156,9 @@ abstract class INode implements Comparable<byte[]> {
return name.length == 0;
}
/** Set the {@link PermissionStatus} */
protected void setPermissionStatus(PermissionStatus ps) {
setUser(ps.getUserName());
setGroup(ps.getGroupName());
setPermission(ps.getPermission());
/** Clone the {@link PermissionStatus}. */
void clonePermissionStatus(INode that) {
this.permission = that.permission;
}
/** Get the {@link PermissionStatus} */
protected PermissionStatus getPermissionStatus() {
@ -201,6 +200,13 @@ abstract class INode implements Comparable<byte[]> {
updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
}
/**
* Check whether it's a file.
*/
public boolean isFile() {
return false;
}
/**
* Check whether it's a directory
*/

View File

@ -317,23 +317,6 @@ class INodeDirectory extends INode {
return addToParent(pathComponents, newNode, true) == null? null: newNode;
}
/**
* Add new inode to the parent if specified.
* Optimized version of addNode() if parent is not null.
*
* @return parent INode if new inode is inserted
* or null if it already exists.
* @throws FileNotFoundException if parent does not exist or
* is not a directory.
*/
INodeDirectory addToParent(INode newNode, INodeDirectory parent,
boolean propagateModTime) throws FileNotFoundException {
// insert into the parent children list
if(parent.addChild(newNode, propagateModTime) == null)
return null;
return parent;
}
INodeDirectory getParent(byte[][] pathComponents
) throws FileNotFoundException, UnresolvedLinkException {
if (pathComponents.length < 2) // add root

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -26,9 +27,13 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
* Directory INode class that has a quota restriction
*/
class INodeDirectoryWithQuota extends INodeDirectory {
private long nsQuota; /// NameSpace quota
/** Name space quota */
private long nsQuota = Long.MAX_VALUE;
/** Name space count */
private long nsCount = 1L;
private long dsQuota; /// disk space quota
/** Disk space quota */
private long dsQuota = HdfsConstants.QUOTA_RESET;
/** Disk space count */
private long diskspace = 0L;
/** Convert an existing directory inode to one with the given quota
@ -57,11 +62,8 @@ class INodeDirectoryWithQuota extends INodeDirectory {
}
/** constructor with no quota verification */
INodeDirectoryWithQuota(String name, PermissionStatus permissions,
long nsQuota, long dsQuota) {
INodeDirectoryWithQuota(String name, PermissionStatus permissions) {
super(name, permissions);
this.nsQuota = nsQuota;
this.dsQuota = dsQuota;
}
/** Get this directory's namespace quota

View File

@ -94,6 +94,12 @@ class INodeFile extends INode implements BlockCollection {
this.blocks = blklist;
}
/** @return true unconditionally. */
@Override
public final boolean isFile() {
return true;
}
/**
* Set the {@link FsPermission} of this {@link INodeFile}.
* Since this is a file,

View File

@ -73,7 +73,7 @@ public class TestFsLimits {
fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
"namenode")).toString());
rootInode = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, perms, 0L, 0L);
rootInode = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, perms);
inodes = new INode[]{ rootInode, null };
fs = null;
fsIsReady = true;