HDFS-7484. Make FSDirectory#addINode take existing INodes as its parameter. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-12-22 23:19:20 -08:00
parent 50ae1a6664
commit 5caebbae8c
10 changed files with 445 additions and 385 deletions

View File

@ -577,7 +577,8 @@ public abstract class SymlinkBaseTest {
} catch (IOException e) {
// Expected
if (wrapper instanceof FileContextTestWrapper) {
assertEquals("No AbstractFileSystem for scheme: null", e.getMessage());
GenericTestUtils.assertExceptionContains(
"No AbstractFileSystem configured for scheme: null", e);
} else if (wrapper instanceof FileSystemTestWrapper) {
assertEquals("No FileSystem for scheme: null", e.getMessage());
}

View File

@ -476,6 +476,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7530. Allow renaming of encryption zone roots. (Charles Lamb via wang)
HDFS-7484. Make FSDirectory#addINode take existing INodes as its parameter.
(jing9)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
@ -29,18 +30,19 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.util.Time.now;
class FSDirMkdirOp {
static HdfsFileStatus mkdirs(
FSNamesystem fsn, String src, PermissionStatus permissions,
boolean createParent) throws IOException {
static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
PermissionStatus permissions, boolean createParent) throws IOException {
FSDirectory fsd = fsn.getFSDirectory();
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
@ -50,188 +52,193 @@ class FSDirMkdirOp {
}
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
INodesInPath iip = fsd.getINodesInPath4Write(src);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
if (!isDirMutable(fsd, iip)) {
if (fsd.isPermissionEnabled()) {
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
if (!createParent) {
fsd.verifyParentDir(iip, src);
}
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation might need to
// create multiple inodes.
fsn.checkFsObjectLimit();
iip = mkdirsRecursively(fsd, iip, permissions, false, now());
if (iip == null) {
throw new IOException("Failed to create directory: " + src);
}
}
return fsd.getAuditFileInfo(iip);
}
static INode unprotectedMkdir(
FSDirectory fsd, long inodeId, String src,
PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, UnresolvedLinkException, AclException {
assert fsd.hasWriteLock();
byte[][] components = INode.getPathComponents(src);
final INodesInPath iip = fsd.getExistingPathINodes(components);
final int pos = iip.length() - 1;
final INodesInPath newiip = unprotectedMkdir(fsd, inodeId, iip, pos,
components[pos], permissions, aclEntries, timestamp);
return newiip.getINode(pos);
}
/**
* Create a directory
* If ancestor directories do not exist, automatically create them.
* @param fsd FSDirectory
* @param iip the INodesInPath instance containing all the existing INodes
* and null elements for non-existing components in the path
* @param permissions the permission of the directory
* @param inheritPermission
* if the permission of the directory should inherit from its parent or not.
* u+wx is implicitly added to the automatically created directories,
* and to the given directory if inheritPermission is true
* @param now creation time
* @return non-null INodesInPath instance if operation succeeds
* @throws QuotaExceededException if directory creation violates
* any quota limit
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
static INodesInPath mkdirsRecursively(FSDirectory fsd, INodesInPath iip,
PermissionStatus permissions, boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException,
AclException {
final int lastInodeIndex = iip.length() - 1;
final byte[][] components = iip.getPathComponents();
final String[] names = new String[components.length];
for (int i = 0; i < components.length; i++) {
names[i] = DFSUtil.bytes2String(components[i]);
}
fsd.writeLock();
try {
if (iip.isSnapshot()) {
throw new SnapshotAccessControlException(
"Modification on RO snapshot is disallowed");
}
final int length = iip.length();
// find the index of the first null in inodes[]
StringBuilder pathbuilder = new StringBuilder();
int i = 1;
INode curNode;
for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
if (!curNode.isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: "
+ pathbuilder + " " + curNode.getLocalName());
}
src = fsd.resolvePath(pc, src, pathComponents);
INodesInPath iip = fsd.getINodesInPath4Write(src);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission ?
iip.getINode(i-1).getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
final INode lastINode = iip.getLastINode();
if (lastINode != null && lastINode.isFile()) {
throw new FileAlreadyExistsException("Path is not a directory: " + src);
}
// create directories beginning from the first null index
for(; i < length; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
iip = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i,
components[i], (i < lastInodeIndex) ? parentPermissions :
permissions, null, now);
if (iip.getINode(i) == null) {
return null;
INodesInPath existing = lastINode != null ? iip : iip.getExistingINodes();
if (lastINode == null) {
if (fsd.isPermissionEnabled()) {
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
final String cur = pathbuilder.toString();
fsd.getEditLog().logMkDir(cur, iip.getINode(i));
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"mkdirs: created directory " + cur);
if (!createParent) {
fsd.verifyParentDir(iip, src);
}
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation might need to
// create multiple inodes.
fsn.checkFsObjectLimit();
List<String> nonExisting = iip.getPath(existing.length(),
iip.length() - existing.length());
int length = nonExisting.size();
if (length > 1) {
List<String> ancestors = nonExisting.subList(0, length - 1);
// Ensure that the user can traversal the path by adding implicit
// u+wx permission to all ancestor directories
existing = createChildrenDirectories(fsd, existing, ancestors,
addImplicitUwx(permissions, permissions));
if (existing == null) {
throw new IOException("Failed to create directory: " + src);
}
}
if ((existing = createChildrenDirectories(fsd, existing,
nonExisting.subList(length - 1, length), permissions)) == null) {
throw new IOException("Failed to create directory: " + src);
}
}
return fsd.getAuditFileInfo(existing);
} finally {
fsd.writeUnlock();
}
return iip;
}
/**
* Check whether the path specifies a directory
* @throws SnapshotAccessControlException if path is in RO snapshot
* For a given absolute path, create all ancestors as directories along the
* path. All ancestors inherit their parent's permission plus an implicit
* u+wx permission. This is used by create() and addSymlink() for
* implicitly creating all directories along the path.
*
* For example, path="/foo/bar/spam", "/foo" is an existing directory,
* "/foo/bar" is not existing yet, the function will create directory bar.
*
* @return a tuple which contains both the new INodesInPath (with all the
* existing and newly created directories) and the last component in the
* relative path. Or return null if there are errors.
*/
private static boolean isDirMutable(FSDirectory fsd, INodesInPath iip)
throws SnapshotAccessControlException {
fsd.readLock();
try {
INode node = iip.getLastINode();
return node != null && node.isDirectory();
} finally {
fsd.readUnlock();
static Map.Entry<INodesInPath, String> createAncestorDirectories(
FSDirectory fsd, INodesInPath iip, PermissionStatus permission)
throws IOException {
final String last = new String(iip.getLastLocalName(), Charsets.UTF_8);
INodesInPath existing = iip.getExistingINodes();
List<String> children = iip.getPath(existing.length(),
iip.length() - existing.length());
int size = children.size();
if (size > 1) { // otherwise all ancestors have been created
List<String> directories = children.subList(0, size - 1);
INode parentINode = existing.getLastINode();
// Ensure that the user can traversal the path by adding implicit
// u+wx permission to all ancestor directories
existing = createChildrenDirectories(fsd, existing, directories,
addImplicitUwx(parentINode.getPermissionStatus(), permission));
if (existing == null) {
return null;
}
}
return new AbstractMap.SimpleImmutableEntry<>(existing, last);
}
/** create a directory at index pos.
* The parent path to the directory is at [0, pos-1].
* All ancestors exist. Newly created one stored at index pos.
/**
* Create the directory {@code parent} / {@code children} and all ancestors
* along the path.
*
* @param fsd FSDirectory
* @param existing The INodesInPath instance containing all the existing
* ancestral INodes
* @param children The relative path from the parent towards children,
* starting with "/"
* @param perm the permission of the directory. Note that all ancestors
* created along the path has implicit {@code u+wx} permissions.
*
* @return {@link INodesInPath} which contains all inodes to the
* target directory, After the execution parentPath points to the path of
* the returned INodesInPath. The function return null if the operation has
* failed.
*/
private static INodesInPath unprotectedMkdir(
FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
long timestamp)
throws QuotaExceededException, AclException {
private static INodesInPath createChildrenDirectories(FSDirectory fsd,
INodesInPath existing, List<String> children, PermissionStatus perm)
throws IOException {
assert fsd.hasWriteLock();
for (String component : children) {
existing = createSingleDirectory(fsd, existing, component, perm);
if (existing == null) {
return null;
}
}
return existing;
}
static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, UnresolvedLinkException, AclException,
FileAlreadyExistsException {
assert fsd.hasWriteLock();
INodesInPath iip = fsd.getINodesInPath(src, false);
final byte[] localName = iip.getLastLocalName();
final INodesInPath existing = iip.getParentINodesInPath();
Preconditions.checkState(existing.getLastINode() != null);
unprotectedMkdir(fsd, inodeId, existing, localName, permissions, aclEntries,
timestamp);
}
private static INodesInPath createSingleDirectory(FSDirectory fsd,
INodesInPath existing, String localName, PermissionStatus perm)
throws IOException {
assert fsd.hasWriteLock();
existing = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), existing,
localName.getBytes(Charsets.UTF_8), perm, null, now());
if (existing == null) {
return null;
}
final INode newNode = existing.getLastINode();
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
String cur = existing.getPath();
fsd.getEditLog().logMkDir(cur, newNode);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("mkdirs: created directory " + cur);
}
return existing;
}
private static PermissionStatus addImplicitUwx(PermissionStatus parentPerm,
PermissionStatus perm) {
FsPermission p = parentPerm.getPermission();
FsPermission ancestorPerm = new FsPermission(
p.getUserAction().or(FsAction.WRITE_EXECUTE),
p.getGroupAction(),
p.getOtherAction());
return new PermissionStatus(perm.getUserName(), perm.getGroupName(),
ancestorPerm);
}
/**
* create a directory at path specified by parent
*/
private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId,
INodesInPath parent, byte[] name, PermissionStatus permission,
List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, AclException, FileAlreadyExistsException {
assert fsd.hasWriteLock();
assert parent.getLastINode() != null;
if (!parent.getLastINode().isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: " +
parent.getPath() + " " + DFSUtil.bytes2String(name));
}
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
timestamp);
if (fsd.addChild(inodesInPath, pos, dir, true)) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
}
return INodesInPath.replace(inodesInPath, pos, dir);
} else {
return inodesInPath;
INodesInPath iip = fsd.addLastINode(parent, dir, true);
if (iip != null && aclEntries != null) {
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
}
return iip;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
@ -124,7 +125,7 @@ class FSDirRenameOp {
*/
@Deprecated
@SuppressWarnings("deprecation")
static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
static boolean renameForEditLog(FSDirectory fsd, String src, String dst,
long timestamp) throws IOException {
if (fsd.isDir(dst)) {
dst += Path.SEPARATOR + new Path(src).getName();
@ -194,11 +195,7 @@ class FSDirRenameOp {
try {
// remove src
final long removedSrc = fsd.removeLastINode(tx.srcIIP);
if (removedSrc == -1) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ "failed to rename " + src + " to " + dst + " because the source" +
" can not be removed");
if (!tx.removeSrc4OldRename()) {
return false;
}
@ -256,8 +253,8 @@ class FSDirRenameOp {
}
/**
* @see #unprotectedRenameTo(FSDirectory, String, String, long,
* org.apache.hadoop.fs.Options.Rename...)
* @see {@link #unprotectedRenameTo(FSDirectory, String, String, INodesInPath,
* INodesInPath, long, BlocksMapUpdateInfo, Options.Rename...)}
*/
static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
@ -305,7 +302,7 @@ class FSDirRenameOp {
* @param timestamp modification time
* @param options Rename options
*/
static boolean unprotectedRenameTo(
static boolean renameForEditLog(
FSDirectory fsd, String src, String dst, long timestamp,
Options.Rename... options)
throws IOException {
@ -331,6 +328,7 @@ class FSDirRenameOp {
* @param timestamp modification time
* @param collectedBlocks blocks to be removed
* @param options Rename options
* @return whether a file/directory gets overwritten in the dst path
*/
static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp,
@ -387,22 +385,14 @@ class FSDirRenameOp {
RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
boolean undoRemoveSrc = true;
final long removedSrc = fsd.removeLastINode(tx.srcIIP);
if (removedSrc == -1) {
error = "Failed to rename " + src + " to " + dst +
" because the source can not be removed";
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
error);
throw new IOException(error);
}
tx.removeSrc();
boolean undoRemoveDst = false;
INode removedDst = null;
long removedNum = 0;
try {
if (dstInode != null) { // dst exists remove it
if ((removedNum = fsd.removeLastINode(tx.dstIIP)) != -1) {
removedDst = tx.dstIIP.getLastINode();
if (dstInode != null) { // dst exists, remove it
removedNum = tx.removeDst();
if (removedNum != -1) {
undoRemoveDst = true;
}
}
@ -419,22 +409,10 @@ class FSDirRenameOp {
// Collect the blocks and remove the lease for previous dst
boolean filesDeleted = false;
if (removedDst != null) {
if (undoRemoveDst) {
undoRemoveDst = false;
if (removedNum > 0) {
List<INode> removedINodes = new ChunkedArrayList<>();
if (!removedDst.isInLatestSnapshot(tx.dstIIP.getLatestSnapshotId())) {
removedDst.destroyAndCollectBlocks(collectedBlocks,
removedINodes);
filesDeleted = true;
} else {
filesDeleted = removedDst.cleanSubtree(
Snapshot.CURRENT_STATE_ID, tx.dstIIP.getLatestSnapshotId(),
collectedBlocks, removedINodes, true)
.get(Quota.NAMESPACE) >= 0;
}
fsd.getFSNamesystem().removePathAndBlocks(src, null,
removedINodes, false);
filesDeleted = tx.cleanDst(collectedBlocks);
}
}
@ -451,22 +429,8 @@ class FSDirRenameOp {
if (undoRemoveSrc) {
tx.restoreSource();
}
if (undoRemoveDst) {
// Rename failed - restore dst
if (dstParent.isDirectory() &&
dstParent.asDirectory().isWithSnapshot()) {
dstParent.asDirectory().undoRename4DstParent(removedDst,
dstIIP.getLatestSnapshotId());
} else {
fsd.addLastINodeNoQuotaCheck(tx.dstIIP, removedDst);
}
if (removedDst.isReference()) {
final INodeReference removedDstRef = removedDst.asReference();
final INodeReference.WithCount wc = (INodeReference.WithCount)
removedDstRef.getReferredINode().asReference();
wc.addReference(removedDstRef);
}
if (undoRemoveDst) { // Rename failed - restore dst
tx.restoreDst();
}
}
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@ -590,8 +554,10 @@ class FSDirRenameOp {
private static class RenameOperation {
private final FSDirectory fsd;
private final INodesInPath srcIIP;
private final INodesInPath dstIIP;
private INodesInPath srcIIP;
private final INodesInPath srcParentIIP;
private INodesInPath dstIIP;
private final INodesInPath dstParentIIP;
private final String src;
private final String dst;
private final INodeReference.WithCount withCount;
@ -602,25 +568,31 @@ class FSDirRenameOp {
private final boolean srcChildIsReference;
private final Quota.Counts oldSrcCounts;
private INode srcChild;
private INode oldDstChild;
RenameOperation(FSDirectory fsd, String src, String dst,
INodesInPath srcIIP, INodesInPath dstIIP)
throws QuotaExceededException {
this.fsd = fsd;
this.dstIIP = dstIIP;
this.src = src;
this.dst = dst;
srcChild = srcIIP.getLastINode();
this.srcIIP = srcIIP;
this.dstIIP = dstIIP;
this.srcParentIIP = srcIIP.getParentINodesInPath();
this.dstParentIIP = dstIIP.getParentINodesInPath();
srcChild = this.srcIIP.getLastINode();
srcChildName = srcChild.getLocalNameBytes();
isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP.getLatestSnapshotId());
final int srcLatestSnapshotId = srcIIP.getLatestSnapshotId();
isSrcInSnapshot = srcChild.isInLatestSnapshot(srcLatestSnapshotId);
srcChildIsReference = srcChild.isReference();
srcParent = srcIIP.getINode(-2).asDirectory();
srcParent = this.srcIIP.getINode(-2).asDirectory();
// Record the snapshot on srcChild. After the rename, before any new
// snapshot is taken on the dst tree, changes will be recorded in the
// latest snapshot of the src tree.
if (isSrcInSnapshot) {
srcChild.recordModification(srcIIP.getLatestSnapshotId());
srcChild.recordModification(srcLatestSnapshotId);
}
// check srcChild for reference
@ -628,12 +600,12 @@ class FSDirRenameOp {
srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
oldSrcCounts = Quota.Counts.newInstance();
if (isSrcInSnapshot) {
final INodeReference.WithName withName =
srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
srcChild, srcIIP.getLatestSnapshotId());
final INodeReference.WithName withName = srcParent
.replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
withCount = (INodeReference.WithCount) withName.getReferredINode();
srcChild = withName;
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, srcChild);
this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
srcChild);
// get the counts before rename
withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
} else if (srcChildIsReference) {
@ -643,12 +615,45 @@ class FSDirRenameOp {
} else {
withCount = null;
}
this.srcIIP = srcIIP;
}
long removeSrc() throws IOException {
long removedNum = fsd.removeLastINode(srcIIP);
if (removedNum == -1) {
String error = "Failed to rename " + src + " to " + dst +
" because the source can not be removed";
NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo:" +
error);
throw new IOException(error);
}
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
return removedNum;
}
boolean removeSrc4OldRename() throws IOException {
final long removedSrc = fsd.removeLastINode(srcIIP);
if (removedSrc == -1) {
NameNode.stateChangeLog.warn("DIR* FSDirRenameOp.unprotectedRenameTo: "
+ "failed to rename " + src + " to " + dst + " because the source" +
" can not be removed");
return false;
} else {
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
return true;
}
}
long removeDst() throws IOException {
long removedNum = fsd.removeLastINode(dstIIP);
if (removedNum != -1) {
oldDstChild = dstIIP.getLastINode();
dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
}
return removedNum;
}
boolean addSourceToDestination() {
final INode dstParent = dstIIP.getINode(-2);
srcChild = srcIIP.getLastINode();
final INode dstParent = dstParentIIP.getLastINode();
final byte[] dstChildName = dstIIP.getLastLocalName();
final INode toDst;
if (withCount == null) {
@ -656,16 +661,15 @@ class FSDirRenameOp {
toDst = srcChild;
} else {
withCount.getReferredINode().setLocalName(dstChildName);
int dstSnapshotId = dstIIP.getLatestSnapshotId();
toDst = new INodeReference.DstReference(dstParent.asDirectory(),
withCount, dstSnapshotId);
withCount, dstIIP.getLatestSnapshotId());
}
return fsd.addLastINodeNoQuotaCheck(dstIIP, toDst);
return fsd.addLastINodeNoQuotaCheck(dstParentIIP, toDst) != null;
}
void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
final INode dstParent = dstIIP.getINode(-2);
final INode dstParent = dstParentIIP.getLastINode();
dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
// update moved lease with new filename
fsd.getFSNamesystem().unprotectedChangeLease(src, dst);
@ -694,10 +698,44 @@ class FSDirRenameOp {
} else {
// srcParent is not an INodeDirectoryWithSnapshot, we only need to add
// the srcChild back
fsd.addLastINodeNoQuotaCheck(srcIIP, srcChild);
fsd.addLastINodeNoQuotaCheck(srcParentIIP, srcChild);
}
}
void restoreDst() throws QuotaExceededException {
Preconditions.checkState(oldDstChild != null);
final INodeDirectory dstParent = dstParentIIP.getLastINode().asDirectory();
if (dstParent.isWithSnapshot()) {
dstParent.undoRename4DstParent(oldDstChild, dstIIP.getLatestSnapshotId());
} else {
fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild);
}
if (oldDstChild != null && oldDstChild.isReference()) {
final INodeReference removedDstRef = oldDstChild.asReference();
final INodeReference.WithCount wc = (INodeReference.WithCount)
removedDstRef.getReferredINode().asReference();
wc.addReference(removedDstRef);
}
}
boolean cleanDst(BlocksMapUpdateInfo collectedBlocks)
throws QuotaExceededException {
Preconditions.checkState(oldDstChild != null);
List<INode> removedINodes = new ChunkedArrayList<>();
final boolean filesDeleted;
if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
oldDstChild.destroyAndCollectBlocks(collectedBlocks, removedINodes);
filesDeleted = true;
} else {
filesDeleted = oldDstChild.cleanSubtree(Snapshot.CURRENT_STATE_ID,
dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
true).get(Quota.NAMESPACE) >= 0;
}
fsd.getFSNamesystem().removePathAndBlocks(src, null, removedINodes,
false);
return filesDeleted;
}
void updateQuotasInSourceTree() throws QuotaExceededException {
// update the quota usage in src tree
if (isSrcInSnapshot) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import java.io.IOException;
import java.util.Map;
import static org.apache.hadoop.util.Time.now;
@ -80,40 +81,39 @@ class FSDirSymlinkOp {
return fsd.getAuditFileInfo(iip);
}
static INodeSymlink unprotectedAddSymlink(
FSDirectory fsd, INodesInPath iip, long id, String target, long mtime,
long atime, PermissionStatus perm)
static INodeSymlink unprotectedAddSymlink(FSDirectory fsd, INodesInPath iip,
byte[] localName, long id, String target, long mtime, long atime,
PermissionStatus perm)
throws UnresolvedLinkException, QuotaExceededException {
assert fsd.hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
target);
return fsd.addINode(iip, symlink) ? symlink : null;
symlink.setLocalName(localName);
return fsd.addINode(iip, symlink) != null ? symlink : null;
}
/**
* Add the given symbolic link to the fs. Record it in the edits log.
*/
private static INodeSymlink addSymlink(
FSDirectory fsd, String path, INodesInPath iip, String target,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException {
private static INodeSymlink addSymlink(FSDirectory fsd, String path,
INodesInPath iip, String target, PermissionStatus dirPerms,
boolean createParent, boolean logRetryCache) throws IOException {
final long mtime = now();
final byte[] localName = iip.getLastLocalName();
if (createParent) {
INodesInPath parentIIP = iip.getParentINodesInPath();
if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(
fsd,
parentIIP, dirPerms, true, mtime)) == null) {
Map.Entry<INodesInPath, String> e = FSDirMkdirOp
.createAncestorDirectories(fsd, iip, dirPerms);
if (e == null) {
return null;
} else {
iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
}
iip = INodesInPath.append(e.getKey(), null, localName);
}
final String userName = dirPerms.getUserName();
long id = fsd.allocateNewInodeId();
PermissionStatus perm = new PermissionStatus(
userName, null, FsPermission.getDefault());
INodeSymlink newNode =
unprotectedAddSymlink(fsd, iip, id, target, mtime, mtime, perm);
INodeSymlink newNode = unprotectedAddSymlink(fsd, iip.getExistingINodes(),
localName, id, target, mtime, mtime, perm);
if (newNode == null) {
NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
return null;

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -81,6 +82,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
/**
@ -370,53 +372,44 @@ public class FSDirectory implements Closeable {
/**
* Add the given filename to the fs.
* @throws FileAlreadyExistsException
* @throws QuotaExceededException
* @throws UnresolvedLinkException
* @throws SnapshotAccessControlException
* @return the new INodesInPath instance that contains the new INode
*/
INodeFile addFile(INodesInPath iip, String path, PermissionStatus permissions,
short replication, long preferredBlockSize,
String clientName, String clientMachine)
INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException, AclException {
long modTime = now();
INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
modTime, replication, preferredBlockSize);
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
newNode.toUnderConstruction(clientName, clientMachine);
boolean added = false;
INodesInPath newiip;
writeLock();
try {
added = addINode(iip, newNode);
newiip = addINode(existing, newNode);
} finally {
writeUnlock();
}
if (!added) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + localName);
return null;
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
}
return newNode;
return newiip;
}
INodeFile unprotectedAddFile(long id,
INodesInPath iip,
PermissionStatus permissions,
List<AclEntry> aclEntries,
List<XAttr> xAttrs,
short replication,
long modificationTime,
long atime,
long preferredBlockSize,
boolean underConstruction,
String clientName,
String clientMachine,
byte storagePolicyId) {
INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List<AclEntry> aclEntries,
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
long preferredBlockSize, boolean underConstruction, String clientName,
String clientMachine, byte storagePolicyId) {
final INodeFile newNode;
assert hasWriteLock();
if (underConstruction) {
@ -428,15 +421,15 @@ public class FSDirectory implements Closeable {
replication, preferredBlockSize, storagePolicyId);
}
newNode.setLocalName(localName);
try {
if (addINode(iip, newNode)) {
INodesInPath iip = addINode(existing, newNode);
if (iip != null) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries,
Snapshot.CURRENT_STATE_ID);
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs,
Snapshot.CURRENT_STATE_ID);
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
}
return newNode;
}
@ -444,7 +437,7 @@ public class FSDirectory implements Closeable {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add "
+ iip.getPath() + " to the file system", e);
+ existing.getPath() + " to the file system", e);
}
}
return null;
@ -683,7 +676,7 @@ public class FSDirectory implements Closeable {
if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
} else {
Quota.Counts counts = targetNode.cleanSubtree(Snapshot.CURRENT_STATE_ID,
Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
latestSnapshot, collectedBlocks, removedINodes, true);
parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-counts.get(Quota.DISKSPACE), true);
@ -857,16 +850,18 @@ public class FSDirectory implements Closeable {
/**
* Add the given child to the namespace.
* @param iip the INodesInPath instance containing all the ancestral INodes
* @param existing the INodesInPath containing all the ancestral INodes
* @param child the new INode to add
* @return a new INodesInPath instance containing the new child INode. Null
* if the adding fails.
* @throws QuotaExceededException is thrown if it violates quota limit
*/
boolean addINode(INodesInPath iip, INode child)
INodesInPath addINode(INodesInPath existing, INode child)
throws QuotaExceededException, UnresolvedLinkException {
child.setLocalName(iip.getLastLocalName());
cacheName(child);
writeLock();
try {
return addLastINode(iip, child, true);
return addLastINode(existing, child, true);
} finally {
writeUnlock();
}
@ -958,7 +953,7 @@ public class FSDirectory implements Closeable {
*/
void verifyMaxDirItems(INodeDirectory parent, String parentPath)
throws MaxDirectoryItemsExceededException {
final int count = parent.getChildrenList(Snapshot.CURRENT_STATE_ID).size();
final int count = parent.getChildrenList(CURRENT_STATE_ID).size();
if (count >= maxDirItems) {
final MaxDirectoryItemsExceededException e
= new MaxDirectoryItemsExceededException(maxDirItems, count);
@ -974,35 +969,27 @@ public class FSDirectory implements Closeable {
}
/**
* The same as {@link #addChild(INodesInPath, int, INode, boolean)}
* with pos = length - 1.
* Add a child to the end of the path specified by INodesInPath.
* @return an INodesInPath instance containing the new INode
*/
private boolean addLastINode(INodesInPath inodesInPath, INode inode,
INodesInPath addLastINode(INodesInPath existing, INode inode,
boolean checkQuota) throws QuotaExceededException {
final int pos = inodesInPath.length() - 1;
return addChild(inodesInPath, pos, inode, checkQuota);
}
assert existing.getLastINode() != null &&
existing.getLastINode().isDirectory();
/** Add a node child to the inodes at index pos.
* Its ancestors are stored at [0, pos-1].
* @return false if the child with this name already exists;
* otherwise return true;
* @throws QuotaExceededException is thrown if it violates quota limit
*/
boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
throws QuotaExceededException {
final int pos = existing.length();
// Disallow creation of /.reserved. This may be created when loading
// editlog/fsimage during upgrade since /.reserved was a valid name in older
// release. This may also be called when a user tries to create a file
// or directory /.reserved.
if (pos == 1 && iip.getINode(0) == rootDir && isReservedName(child)) {
if (pos == 1 && existing.getINode(0) == rootDir && isReservedName(inode)) {
throw new HadoopIllegalArgumentException(
"File name \"" + child.getLocalName() + "\" is reserved and cannot "
"File name \"" + inode.getLocalName() + "\" is reserved and cannot "
+ "be created. If this is during upgrade change the name of the "
+ "existing file or directory to another name before upgrading "
+ "to the new release.");
}
final INodeDirectory parent = iip.getINode(pos-1).asDirectory();
final INodeDirectory parent = existing.getINode(pos - 1).asDirectory();
// The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add
// to the dest, if that fails, re-adds the src from whence it came.
@ -1010,44 +997,45 @@ public class FSDirectory implements Closeable {
// original location becase a quota violation would cause the the item
// to go "poof". The fs limits must be bypassed for the same reason.
if (checkQuota) {
final String parentPath = iip.getPath(pos - 1);
verifyMaxComponentLength(child.getLocalNameBytes(), parentPath);
final String parentPath = existing.getPath(pos - 1);
verifyMaxComponentLength(inode.getLocalNameBytes(), parentPath);
verifyMaxDirItems(parent, parentPath);
}
// always verify inode name
verifyINodeName(child.getLocalNameBytes());
verifyINodeName(inode.getLocalNameBytes());
final Quota.Counts counts = child.computeQuotaUsage();
updateCount(iip, pos,
final Quota.Counts counts = inode.computeQuotaUsage();
updateCount(existing, pos,
counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
boolean isRename = (child.getParent() != null);
boolean isRename = (inode.getParent() != null);
boolean added;
try {
added = parent.addChild(child, true, iip.getLatestSnapshotId());
added = parent.addChild(inode, true, existing.getLatestSnapshotId());
} catch (QuotaExceededException e) {
updateCountNoQuotaCheck(iip, pos,
updateCountNoQuotaCheck(existing, pos,
-counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
throw e;
}
if (!added) {
updateCountNoQuotaCheck(iip, pos,
updateCountNoQuotaCheck(existing, pos,
-counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
return null;
} else {
if (!isRename) {
AclStorage.copyINodeDefaultAcl(child);
AclStorage.copyINodeDefaultAcl(inode);
}
addToInodeMap(child);
addToInodeMap(inode);
}
return added;
return INodesInPath.append(existing, inode, inode.getLocalNameBytes());
}
boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
INodesInPath addLastINodeNoQuotaCheck(INodesInPath existing, INode i) {
try {
return addLastINode(inodesInPath, i, false);
return addLastINode(existing, i, false);
} catch (QuotaExceededException e) {
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
}
return false;
return null;
}
/**
@ -1058,8 +1046,7 @@ public class FSDirectory implements Closeable {
* reference nodes;
* >0 otherwise.
*/
long removeLastINode(final INodesInPath iip)
throws QuotaExceededException {
long removeLastINode(final INodesInPath iip) throws QuotaExceededException {
final int latestSnapshot = iip.getLatestSnapshotId();
final INode last = iip.getLastINode();
final INodeDirectory parent = iip.getINode(-2).asDirectory();

View File

@ -347,6 +347,7 @@ public class FSEditLogLoader {
if (oldFile != null && addCloseOp.overwrite) {
// This is OP_ADD with overwrite
fsDir.unprotectedDelete(path, addCloseOp.mtime);
iip = INodesInPath.replace(iip, iip.length() - 1, null);
oldFile = null;
}
INodeFile newFile = oldFile;
@ -358,13 +359,16 @@ public class FSEditLogLoader {
assert addCloseOp.blocks.length == 0;
// add to the file tree
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
lastInodeId);
newFile = fsDir.unprotectedAddFile(
inodeId, iip, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.xAttrs, replication, addCloseOp.mtime,
addCloseOp.atime, addCloseOp.blockSize, true,
addCloseOp.clientName, addCloseOp.clientMachine,
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(),
iip.getLastLocalName(),
addCloseOp.permissions,
addCloseOp.aclEntries,
addCloseOp.xAttrs, replication,
addCloseOp.mtime, addCloseOp.atime,
addCloseOp.blockSize, true,
addCloseOp.clientName,
addCloseOp.clientMachine,
addCloseOp.storagePolicyId);
iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
@ -506,7 +510,7 @@ public class FSEditLogLoader {
RenameOldOp renameOp = (RenameOldOp)op;
final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion);
final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion);
FSDirRenameOp.unprotectedRenameTo(fsDir, src, dst, renameOp.timestamp);
FSDirRenameOp.renameForEditLog(fsDir, src, dst, renameOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
@ -528,7 +532,7 @@ public class FSEditLogLoader {
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
FSDirMkdirOp.unprotectedMkdir(fsDir, inodeId,
FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;
@ -568,9 +572,10 @@ public class FSEditLogLoader {
}
case OP_SET_QUOTA:
SetQuotaOp setQuotaOp = (SetQuotaOp)op;
FSDirAttrOp.unprotectedSetQuota(fsDir, renameReservedPathsOnUpgrade(
setQuotaOp.src, logVersion), setQuotaOp.nsQuota, setQuotaOp.dsQuota);
SetQuotaOp setQuotaOp = (SetQuotaOp) op;
FSDirAttrOp.unprotectedSetQuota(fsDir,
renameReservedPathsOnUpgrade(setQuotaOp.src, logVersion),
setQuotaOp.nsQuota, setQuotaOp.dsQuota);
break;
case OP_TIMES: {
@ -587,9 +592,9 @@ public class FSEditLogLoader {
final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
logVersion);
final INodesInPath iip = fsDir.getINodesInPath(path, false);
FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip, inodeId, symlinkOp.value,
symlinkOp.mtime, symlinkOp.atime,
symlinkOp.permissionStatus);
FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
symlinkOp.atime, symlinkOp.permissionStatus);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
@ -598,7 +603,7 @@ public class FSEditLogLoader {
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
FSDirRenameOp.unprotectedRenameTo(fsDir,
FSDirRenameOp.renameForEditLog(fsDir,
renameReservedPathsOnUpgrade(renameOp.src, logVersion),
renameReservedPathsOnUpgrade(renameOp.dst, logVersion),
renameOp.timestamp, renameOp.options);

View File

@ -2211,13 +2211,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
toRemoveBlocks = startFileInternal(pc, iip, permissions, holder,
clientMachine, create, overwrite, createParent, replication,
blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
FSDirectory.isReservedRawName(srcArg), true);
dir.writeLock();
try {
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
toRemoveBlocks = startFileInternal(
pc, iip, permissions, holder,
clientMachine, create, overwrite,
createParent, replication, blockSize,
isLazyPersist, suite, protocolVersion, edek,
logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(
dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
} finally {
dir.writeUnlock();
}
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -2311,6 +2319,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
long ret = dir.delete(iip, toRemoveBlocks, toRemoveINodes, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip, iip.length() - 1, null);
incrDeletedFileCount(ret);
removePathAndBlocks(src, null, toRemoveINodes, true);
}
@ -2326,12 +2335,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
INodeFile newNode = null;
// Always do an implicit mkdirs for parent directory tree.
INodesInPath parentIIP = iip.getParentINodesInPath();
if (parentIIP != null && (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
parentIIP, permissions, true, now())) != null) {
iip = INodesInPath.append(parentIIP, newNode, iip.getLastLocalName());
newNode = dir.addFile(iip, src, permissions, replication, blockSize,
holder, clientMachine);
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
.createAncestorDirectories(dir, iip, permissions);
if (parent != null) {
iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
replication, blockSize, holder, clientMachine);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
if (newNode == null) {

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -352,6 +353,21 @@ public class INodesInPath {
return DFSUtil.byteArray2PathString(path, 0, pos);
}
/**
* @param offset start endpoint (inclusive)
* @param length number of path components
* @return sub-list of the path
*/
public List<String> getPath(int offset, int length) {
Preconditions.checkArgument(offset >= 0 && length >= 0 && offset + length
<= path.length);
ImmutableList.Builder<String> components = ImmutableList.builder();
for (int i = offset; i < offset + length; i++) {
components.add(DFSUtil.bytes2String(path[i]));
}
return components.build();
}
public int length() {
return inodes.length;
}
@ -363,27 +379,17 @@ public class INodesInPath {
/**
* @param length number of ancestral INodes in the returned INodesInPath
* instance
* @return the INodesInPath instance containing ancestral INodes
* @return the INodesInPath instance containing ancestral INodes. Note that
* this method only handles non-snapshot paths.
*/
private INodesInPath getAncestorINodesInPath(int length) {
Preconditions.checkArgument(length >= 0 && length < inodes.length);
Preconditions.checkState(!isSnapshot());
final INode[] anodes = new INode[length];
final byte[][] apath;
final boolean isSnapshot;
final int snapshotId;
int dotSnapshotIndex = getDotSnapshotIndex();
if (this.isSnapshot && length >= dotSnapshotIndex + 1) {
apath = new byte[length + 1][];
isSnapshot = true;
snapshotId = this.snapshotId;
} else {
apath = new byte[length][];
isSnapshot = false;
snapshotId = this.isSnapshot ? CURRENT_STATE_ID : this.snapshotId;
}
final byte[][] apath = new byte[length][];
System.arraycopy(this.inodes, 0, anodes, 0, length);
System.arraycopy(this.path, 0, apath, 0, apath.length);
return new INodesInPath(anodes, apath, isSnapshot, snapshotId);
System.arraycopy(this.path, 0, apath, 0, length);
return new INodesInPath(anodes, apath, false, snapshotId);
}
/**
@ -395,19 +401,23 @@ public class INodesInPath {
null;
}
private int getDotSnapshotIndex() {
if (isSnapshot) {
for (int i = 0; i < path.length; i++) {
if (isDotSnapshotDir(path[i])) {
return i;
}
/**
* @return a new INodesInPath instance that only contains exisitng INodes.
* Note that this method only handles non-snapshot paths.
*/
public INodesInPath getExistingINodes() {
Preconditions.checkState(!isSnapshot());
int i = 0;
for (; i < inodes.length; i++) {
if (inodes[i] == null) {
break;
}
throw new IllegalStateException("The path " + getPath()
+ " is a snapshot path but does not contain "
+ HdfsConstants.DOT_SNAPSHOT_DIR);
} else {
return -1;
}
INode[] existing = new INode[i];
byte[][] existingPath = new byte[i][];
System.arraycopy(inodes, 0, existing, 0, i);
System.arraycopy(path, 0, existingPath, 0, i);
return new INodesInPath(existing, existingPath, false, snapshotId);
}
/**

View File

@ -76,7 +76,7 @@ import org.mockito.Mockito;
/** Testing rename with snapshots. */
public class TestRenameWithSnapshots {
{
static {
SnapshotTestHelper.disableLogs();
}
private static final Log LOG = LogFactory.getLog(TestRenameWithSnapshots.class);
@ -2066,10 +2066,10 @@ public class TestRenameWithSnapshots {
/**
* This test demonstrates that
* {@link INodeDirectory#removeChild(INode, Snapshot)}
* {@link INodeDirectory#removeChild}
* and
* {@link INodeDirectory#addChild(INode, boolean, Snapshot)}
* should use {@link INode#isInLatestSnapshot(Snapshot)} to check if the
* {@link INodeDirectory#addChild}
* should use {@link INode#isInLatestSnapshot} to check if the
* added/removed child should be recorded in snapshots.
*/
@Test