HDFS-7468. Moving verify* functions to corresponding classes. Contributed by Li Lu.

This commit is contained in:
Haohui Mai 2014-12-04 14:09:45 -08:00
parent 258623ff8b
commit 26d8dec756
4 changed files with 78 additions and 71 deletions

View File

@ -427,6 +427,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7458. Add description to the nfs ports in core-site.xml used by nfs HDFS-7458. Add description to the nfs ports in core-site.xml used by nfs
test to avoid confusion (Yongjun Zhang via brandonli) test to avoid confusion (Yongjun Zhang via brandonli)
HDFS-7468. Moving verify* functions to corresponding classes.
(Li Lu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotException;
@ -72,6 +73,51 @@ class FSDirRenameOp {
return new RenameOldResult(status, resultingStat); return new RenameOldResult(status, resultingStat);
} }
/**
* Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
* dstInodes[dstInodes.length-1]
*/
static void verifyQuotaForRename(FSDirectory fsd,
INode[] src, INode[] dst)
throws QuotaExceededException {
if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
// Do not check quota if edits log is still being processed
return;
}
int i = 0;
while(src[i] == dst[i]) { i++; }
// src[i - 1] is the last common ancestor.
final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
// Reduce the required quota by dst that is being removed
final int dstIndex = dst.length - 1;
if (dst[dstIndex] != null) {
delta.subtract(dst[dstIndex].computeQuotaUsage());
}
FSDirectory.verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
delta.get(Quota.DISKSPACE), src[i - 1]);
}
/**
* Checks file system limits (max component length and max directory items)
* during a rename operation.
*/
static void verifyFsLimitsForRename(FSDirectory fsd,
INodesInPath srcIIP,
INodesInPath dstIIP)
throws FSLimitException.PathComponentTooLongException,
FSLimitException.MaxDirectoryItemsExceededException {
byte[] dstChildName = dstIIP.getLastLocalName();
INode[] dstInodes = dstIIP.getINodes();
int pos = dstInodes.length - 1;
fsd.verifyMaxComponentLength(dstChildName, dstInodes, pos);
// Do not enforce max directory items if renaming within same directory.
if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
fsd.verifyMaxDirItems(dstInodes, pos);
}
}
/** /**
* Change a path name * Change a path name
* *
@ -129,8 +175,8 @@ class FSDirRenameOp {
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src); fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
// Ensure dst has quota to accommodate rename // Ensure dst has quota to accommodate rename
fsd.verifyFsLimitsForRename(srcIIP, dstIIP); verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
fsd.verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes()); verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP); RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
@ -310,8 +356,8 @@ class FSDirRenameOp {
} }
// Ensure dst has quota to accommodate rename // Ensure dst has quota to accommodate rename
fsd.verifyFsLimitsForRename(srcIIP, dstIIP); verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
fsd.verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes()); verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP); RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);

View File

@ -17,9 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@ -32,6 +35,19 @@ import java.io.IOException;
import java.util.List; import java.util.List;
class FSDirSnapshotOp { class FSDirSnapshotOp {
/** Verify if the snapshot name is legal. */
static void verifySnapshotName(FSDirectory fsd, String snapshotName,
String path)
throws FSLimitException.PathComponentTooLongException {
if (snapshotName.contains(Path.SEPARATOR)) {
throw new HadoopIllegalArgumentException(
"Snapshot name cannot contain \"" + Path.SEPARATOR + "\"");
}
final byte[] bytes = DFSUtil.string2Bytes(snapshotName);
fsd.verifyINodeName(bytes);
fsd.verifyMaxComponentLength(bytes, path, 0);
}
/** Allow snapshot on a directory. */ /** Allow snapshot on a directory. */
static void allowSnapshot(FSDirectory fsd, SnapshotManager snapshotManager, static void allowSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
String path) throws IOException { String path) throws IOException {
@ -82,7 +98,7 @@ class FSDirSnapshotOp {
snapshotName); snapshotName);
} }
} }
fsd.verifySnapshotName(snapshotName, snapshotRoot); verifySnapshotName(fsd, snapshotName, snapshotRoot);
fsd.writeLock(); fsd.writeLock();
try { try {
snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName); snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
@ -103,7 +119,7 @@ class FSDirSnapshotOp {
FSPermissionChecker pc = fsd.getPermissionChecker(); FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.checkOwner(pc, path); fsd.checkOwner(pc, path);
} }
fsd.verifySnapshotName(snapshotNewName, path); verifySnapshotName(fsd, snapshotNewName, path);
fsd.writeLock(); fsd.writeLock();
try { try {
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName); snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);

View File

@ -291,6 +291,10 @@ public class FSDirectory implements Closeable {
} }
} }
boolean shouldSkipQuotaChecks() {
return skipQuotaCheck;
}
/** Enable quota verification */ /** Enable quota verification */
void enableQuotaChecks() { void enableQuotaChecks() {
skipQuotaCheck = false; skipQuotaCheck = false;
@ -1095,7 +1099,7 @@ public class FSDirectory implements Closeable {
* Pass null if a node is not being moved. * Pass null if a node is not being moved.
* @throws QuotaExceededException if quota limit is exceeded. * @throws QuotaExceededException if quota limit is exceeded.
*/ */
private static void verifyQuota(INode[] inodes, int pos, long nsDelta, static void verifyQuota(INode[] inodes, int pos, long nsDelta,
long dsDelta, INode commonAncestor) throws QuotaExceededException { long dsDelta, INode commonAncestor) throws QuotaExceededException {
if (nsDelta <= 0 && dsDelta <= 0) { if (nsDelta <= 0 && dsDelta <= 0) {
// if quota is being freed or not being consumed // if quota is being freed or not being consumed
@ -1120,69 +1124,7 @@ public class FSDirectory implements Closeable {
} }
} }
} }
/**
* Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
* dstInodes[dstInodes.length-1]
*
* @param src directory from where node is being moved.
* @param dst directory to where node is moved to.
* @throws QuotaExceededException if quota limit is exceeded.
*/
void verifyQuotaForRename(INode[] src, INode[] dst)
throws QuotaExceededException {
if (!namesystem.isImageLoaded() || skipQuotaCheck) {
// Do not check quota if edits log is still being processed
return;
}
int i = 0;
while(src[i] == dst[i]) { i++; }
// src[i - 1] is the last common ancestor.
final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
// Reduce the required quota by dst that is being removed
final int dstIndex = dst.length - 1;
if (dst[dstIndex] != null) {
delta.subtract(dst[dstIndex].computeQuotaUsage());
}
verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
delta.get(Quota.DISKSPACE), src[i - 1]);
}
/**
* Checks file system limits (max component length and max directory items)
* during a rename operation.
*
* @param srcIIP INodesInPath containing every inode in the rename source
* @param dstIIP INodesInPath containing every inode in the rename destination
* @throws PathComponentTooLongException child's name is too long.
* @throws MaxDirectoryItemsExceededException too many children.
*/
void verifyFsLimitsForRename(INodesInPath srcIIP, INodesInPath dstIIP)
throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
byte[] dstChildName = dstIIP.getLastLocalName();
INode[] dstInodes = dstIIP.getINodes();
int pos = dstInodes.length - 1;
verifyMaxComponentLength(dstChildName, dstInodes, pos);
// Do not enforce max directory items if renaming within same directory.
if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
verifyMaxDirItems(dstInodes, pos);
}
}
/** Verify if the snapshot name is legal. */
void verifySnapshotName(String snapshotName, String path)
throws PathComponentTooLongException {
if (snapshotName.contains(Path.SEPARATOR)) {
throw new HadoopIllegalArgumentException(
"Snapshot name cannot contain \"" + Path.SEPARATOR + "\"");
}
final byte[] bytes = DFSUtil.string2Bytes(snapshotName);
verifyINodeName(bytes);
verifyMaxComponentLength(bytes, path, 0);
}
/** Verify if the inode name is legal. */ /** Verify if the inode name is legal. */
void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException { void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException {
if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) { if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
@ -1202,7 +1144,7 @@ public class FSDirectory implements Closeable {
* @param pos int position of new child in path * @param pos int position of new child in path
* @throws PathComponentTooLongException child's name is too long. * @throws PathComponentTooLongException child's name is too long.
*/ */
private void verifyMaxComponentLength(byte[] childName, Object parentPath, void verifyMaxComponentLength(byte[] childName, Object parentPath,
int pos) throws PathComponentTooLongException { int pos) throws PathComponentTooLongException {
if (maxComponentLength == 0) { if (maxComponentLength == 0) {
return; return;
@ -1230,7 +1172,7 @@ public class FSDirectory implements Closeable {
* @param pos int position of new child in pathComponents * @param pos int position of new child in pathComponents
* @throws MaxDirectoryItemsExceededException too many children. * @throws MaxDirectoryItemsExceededException too many children.
*/ */
private void verifyMaxDirItems(INode[] pathComponents, int pos) void verifyMaxDirItems(INode[] pathComponents, int pos)
throws MaxDirectoryItemsExceededException { throws MaxDirectoryItemsExceededException {
final INodeDirectory parent = pathComponents[pos-1].asDirectory(); final INodeDirectory parent = pathComponents[pos-1].asDirectory();