From 49122df1c00e31d9a397dade4d534c794ccf7224 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Mon, 1 Dec 2014 15:28:10 -0800 Subject: [PATCH] HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and GetContentSummary() into a single class. Contributed by Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../namenode/FSDirStatAndListingOp.java | 480 ++++++++++++++++++ .../hdfs/server/namenode/FSDirectory.java | 364 +------------ .../hdfs/server/namenode/FSEditLogLoader.java | 7 +- .../hdfs/server/namenode/FSNamesystem.java | 128 +---- .../hdfs/server/namenode/NameNodeAdapter.java | 3 +- 6 files changed, 534 insertions(+), 451 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d9094973b66..3c726893825 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -150,6 +150,9 @@ Release 2.7.0 - UNRELEASED HDFS-7210. Avoid two separate RPC's namenode.append() and namenode.getFileInfo() for an append call from DFSClient. (Vinayakumar B via umamahesh) + HDFS-7450. Consolidate the implementation of GetFileInfo(), GetListings() and + GetContentSummary() into a single class. (wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java new file mode 100644 index 00000000000..35b3a6bcd4b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; +import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.hdfs.util.ReadOnlyList; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; + +class FSDirStatAndListingOp { + static DirectoryListing getListingInt( + FSDirectory fsd, final String srcArg, byte[] startAfter, + boolean needLocation) + throws IOException { + String src = srcArg; + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + String startAfterString = new String(startAfter); + src = fsd.resolvePath(pc, src, pathComponents); + + // Get file name when startAfter is an INodePath + if (FSDirectory.isReservedName(startAfterString)) { + byte[][] startAfterComponents = FSDirectory + .getPathComponentsForReservedPath(startAfterString); + try { + String tmp = FSDirectory.resolvePath(src, startAfterComponents, fsd); + byte[][] regularPath = INode.getPathComponents(tmp); + startAfter = regularPath[regularPath.length - 1]; + } catch (IOException e) { + // Possibly the inode is deleted + throw new DirectoryListingStartAfterNotFoundException( + "Can't find startAfter " + startAfterString); + } + } + + boolean isSuperUser = true; + if (fsd.isPermissionEnabled()) { + if (fsd.isDir(src)) { + fsd.checkPathAccess(pc, src, FsAction.READ_EXECUTE); + } else { + fsd.checkTraverse(pc, src); + } + isSuperUser = pc.isSuperUser(); + } + return getListing(fsd, src, startAfter, needLocation, isSuperUser); + } + + /** + * Get the file info for a specific file. + * + * @param srcArg The string representation of the path to the file + * @param resolveLink whether to throw UnresolvedLinkException + * if src refers to a symlink + * + * @return object containing information regarding the file + * or null if file not found + */ + static HdfsFileStatus getFileInfo( + FSDirectory fsd, String srcArg, boolean resolveLink) + throws IOException { + String src = srcArg; + if (!DFSUtil.isValidName(src)) { + throw new InvalidPathException("Invalid file name: " + src); + } + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + boolean isSuperUser = true; + if (fsd.isPermissionEnabled()) { + fsd.checkPermission(pc, src, false, null, null, null, null, false, + resolveLink); + isSuperUser = pc.isSuperUser(); + } + return getFileInfo(fsd, src, resolveLink, + FSDirectory.isReservedRawName(srcArg), isSuperUser); + } + + /** + * Returns true if the file is closed + */ + static boolean isFileClosed(FSDirectory fsd, String src) throws IOException { + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + if (fsd.isPermissionEnabled()) { + fsd.checkTraverse(pc, src); + } + return !INodeFile.valueOf(fsd.getINode(src), src).isUnderConstruction(); + } + + static ContentSummary getContentSummary( + FSDirectory fsd, String src) throws IOException { + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + FSPermissionChecker pc = fsd.getPermissionChecker(); + src = fsd.resolvePath(pc, src, pathComponents); + if (fsd.isPermissionEnabled()) { + fsd.checkPermission(pc, src, false, null, null, null, + FsAction.READ_EXECUTE); + } + return getContentSummaryInt(fsd, src); + } + + /** + * Get a partial listing of the indicated directory + * + * We will stop when any of the following conditions is met: + * 1) this.lsLimit files have been added + * 2) needLocation is true AND enough files have been added such + * that at least this.lsLimit block locations are in the response + * + * @param fsd FSDirectory + * @param src the directory name + * @param startAfter the name to start listing after + * @param needLocation if block locations are returned + * @return a partial listing starting after startAfter + */ + private static DirectoryListing getListing( + FSDirectory fsd, String src, byte[] startAfter, boolean needLocation, + boolean isSuperUser) + throws IOException { + String srcs = FSDirectory.normalizePath(src); + final boolean isRawPath = FSDirectory.isReservedRawName(src); + + fsd.readLock(); + try { + if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) { + return getSnapshotsListing(fsd, srcs, startAfter); + } + final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true); + final INode[] inodes = inodesInPath.getINodes(); + final int snapshot = inodesInPath.getPathSnapshotId(); + final INode targetNode = inodes[inodes.length - 1]; + if (targetNode == null) + return null; + byte parentStoragePolicy = isSuperUser ? + targetNode.getStoragePolicyID() : BlockStoragePolicySuite + .ID_UNSPECIFIED; + + if (!targetNode.isDirectory()) { + return new DirectoryListing( + new HdfsFileStatus[]{createFileStatus(fsd, + HdfsFileStatus.EMPTY_NAME, targetNode, needLocation, + parentStoragePolicy, snapshot, isRawPath, inodesInPath)}, 0); + } + + final INodeDirectory dirInode = targetNode.asDirectory(); + final ReadOnlyList contents = dirInode.getChildrenList(snapshot); + int startChild = INodeDirectory.nextChild(contents, startAfter); + int totalNumChildren = contents.size(); + int numOfListing = Math.min(totalNumChildren - startChild, + fsd.getLsLimit()); + int locationBudget = fsd.getLsLimit(); + int listingCnt = 0; + HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing]; + for (int i=0; i0; i++) { + INode cur = contents.get(startChild+i); + byte curPolicy = isSuperUser && !cur.isSymlink()? + cur.getLocalStoragePolicyID(): + BlockStoragePolicySuite.ID_UNSPECIFIED; + listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur, + needLocation, fsd.getStoragePolicyID(curPolicy, + parentStoragePolicy), snapshot, isRawPath, inodesInPath); + listingCnt++; + if (needLocation) { + // Once we hit lsLimit locations, stop. + // This helps to prevent excessively large response payloads. + // Approximate #locations with locatedBlockCount() * repl_factor + LocatedBlocks blks = + ((HdfsLocatedFileStatus)listing[i]).getBlockLocations(); + locationBudget -= (blks == null) ? 0 : + blks.locatedBlockCount() * listing[i].getReplication(); + } + } + // truncate return array if necessary + if (listingCnt < numOfListing) { + listing = Arrays.copyOf(listing, listingCnt); + } + return new DirectoryListing( + listing, totalNumChildren-startChild-listingCnt); + } finally { + fsd.readUnlock(); + } + } + + /** + * Get a listing of all the snapshots of a snapshottable directory + */ + private static DirectoryListing getSnapshotsListing( + FSDirectory fsd, String src, byte[] startAfter) + throws IOException { + Preconditions.checkState(fsd.hasReadLock()); + Preconditions.checkArgument( + src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), + "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR); + + final String dirPath = FSDirectory.normalizePath(src.substring(0, + src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length())); + + final INode node = fsd.getINode(dirPath); + final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath); + final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature(); + if (sf == null) { + throw new SnapshotException( + "Directory is not a snapshottable directory: " + dirPath); + } + final ReadOnlyList snapshots = sf.getSnapshotList(); + int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter); + skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1; + int numOfListing = Math.min(snapshots.size() - skipSize, fsd.getLsLimit()); + final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing]; + for (int i = 0; i < numOfListing; i++) { + Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot(); + listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot, + BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, + false, null); + } + return new DirectoryListing( + listing, snapshots.size() - skipSize - numOfListing); + } + + /** Get the file info for a specific file. + * @param fsd FSDirectory + * @param src The string representation of the path to the file + * @param resolveLink whether to throw UnresolvedLinkException + * @param isRawPath true if a /.reserved/raw pathname was passed by the user + * @param includeStoragePolicy whether to include storage policy + * @return object containing information regarding the file + * or null if file not found + */ + static HdfsFileStatus getFileInfo( + FSDirectory fsd, String src, boolean resolveLink, boolean isRawPath, + boolean includeStoragePolicy) + throws IOException { + String srcs = FSDirectory.normalizePath(src); + fsd.readLock(); + try { + if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) { + return getFileInfo4DotSnapshot(fsd, srcs); + } + final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, resolveLink); + final INode[] inodes = inodesInPath.getINodes(); + final INode i = inodes[inodes.length - 1]; + byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ? + i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; + return i == null ? null : createFileStatus(fsd, + HdfsFileStatus.EMPTY_NAME, i, policyId, + inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath); + } finally { + fsd.readUnlock(); + } + } + + /** + * Currently we only support "ls /xxx/.snapshot" which will return all the + * snapshots of a directory. The FSCommand Ls will first call getFileInfo to + * make sure the file/directory exists (before the real getListing call). + * Since we do not have a real INode for ".snapshot", we return an empty + * non-null HdfsFileStatus here. + */ + private static HdfsFileStatus getFileInfo4DotSnapshot( + FSDirectory fsd, String src) + throws UnresolvedLinkException { + if (fsd.getINode4DotSnapshot(src) != null) { + return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, + HdfsFileStatus.EMPTY_NAME, -1L, 0, null, + BlockStoragePolicySuite.ID_UNSPECIFIED); + } + return null; + } + + /** + * create an hdfs file status from an inode + * + * @param fsd FSDirectory + * @param path the local name + * @param node inode + * @param needLocation if block locations need to be included or not + * @param isRawPath true if this is being called on behalf of a path in + * /.reserved/raw + * @return a file status + * @throws java.io.IOException if any error occurs + */ + static HdfsFileStatus createFileStatus( + FSDirectory fsd, byte[] path, INode node, boolean needLocation, + byte storagePolicy, int snapshot, boolean isRawPath, INodesInPath iip) + throws IOException { + if (needLocation) { + return createLocatedFileStatus(fsd, path, node, storagePolicy, + snapshot, isRawPath, iip); + } else { + return createFileStatus(fsd, path, node, storagePolicy, snapshot, + isRawPath, iip); + } + } + + /** + * Create FileStatus by file INode + */ + static HdfsFileStatus createFileStatus( + FSDirectory fsd, byte[] path, INode node, byte storagePolicy, + int snapshot, boolean isRawPath, INodesInPath iip) throws IOException { + long size = 0; // length is zero for directories + short replication = 0; + long blocksize = 0; + final boolean isEncrypted; + + final FileEncryptionInfo feInfo = isRawPath ? null : + fsd.getFileEncryptionInfo(node, snapshot, iip); + + if (node.isFile()) { + final INodeFile fileNode = node.asFile(); + size = fileNode.computeFileSize(snapshot); + replication = fileNode.getFileReplication(snapshot); + blocksize = fileNode.getPreferredBlockSize(); + isEncrypted = (feInfo != null) || + (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node))); + } else { + isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node)); + } + + int childrenNum = node.isDirectory() ? + node.asDirectory().getChildrenNum(snapshot) : 0; + + return new HdfsFileStatus( + size, + node.isDirectory(), + replication, + blocksize, + node.getModificationTime(snapshot), + node.getAccessTime(snapshot), + getPermissionForFileStatus(node, snapshot, isEncrypted), + node.getUserName(snapshot), + node.getGroupName(snapshot), + node.isSymlink() ? node.asSymlink().getSymlink() : null, + path, + node.getId(), + childrenNum, + feInfo, + storagePolicy); + } + + /** + * Create FileStatus with location info by file INode + */ + private static HdfsLocatedFileStatus createLocatedFileStatus( + FSDirectory fsd, byte[] path, INode node, byte storagePolicy, + int snapshot, boolean isRawPath, INodesInPath iip) throws IOException { + assert fsd.hasReadLock(); + long size = 0; // length is zero for directories + short replication = 0; + long blocksize = 0; + LocatedBlocks loc = null; + final boolean isEncrypted; + final FileEncryptionInfo feInfo = isRawPath ? null : + fsd.getFileEncryptionInfo(node, snapshot, iip); + if (node.isFile()) { + final INodeFile fileNode = node.asFile(); + size = fileNode.computeFileSize(snapshot); + replication = fileNode.getFileReplication(snapshot); + blocksize = fileNode.getPreferredBlockSize(); + + final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID; + final boolean isUc = !inSnapshot && fileNode.isUnderConstruction(); + final long fileSize = !inSnapshot && isUc ? + fileNode.computeFileSizeNotIncludingLastUcBlock() : size; + + loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks( + fileNode.getBlocks(), fileSize, isUc, 0L, size, false, + inSnapshot, feInfo); + if (loc == null) { + loc = new LocatedBlocks(); + } + isEncrypted = (feInfo != null) || + (isRawPath && fsd.isInAnEZ(INodesInPath.fromINode(node))); + } else { + isEncrypted = fsd.isInAnEZ(INodesInPath.fromINode(node)); + } + int childrenNum = node.isDirectory() ? + node.asDirectory().getChildrenNum(snapshot) : 0; + + HdfsLocatedFileStatus status = + new HdfsLocatedFileStatus(size, node.isDirectory(), replication, + blocksize, node.getModificationTime(snapshot), + node.getAccessTime(snapshot), + getPermissionForFileStatus(node, snapshot, isEncrypted), + node.getUserName(snapshot), node.getGroupName(snapshot), + node.isSymlink() ? node.asSymlink().getSymlink() : null, path, + node.getId(), loc, childrenNum, feInfo, storagePolicy); + // Set caching information for the located blocks. + if (loc != null) { + CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager(); + for (LocatedBlock lb: loc.getLocatedBlocks()) { + cacheManager.setCachedLocations(lb); + } + } + return status; + } + + /** + * Returns an inode's FsPermission for use in an outbound FileStatus. If the + * inode has an ACL or is for an encrypted file/dir, then this method will + * return an FsPermissionExtension. + * + * @param node INode to check + * @param snapshot int snapshot ID + * @param isEncrypted boolean true if the file/dir is encrypted + * @return FsPermission from inode, with ACL bit on if the inode has an ACL + * and encrypted bit on if it represents an encrypted file/dir. + */ + private static FsPermission getPermissionForFileStatus( + INode node, int snapshot, boolean isEncrypted) { + FsPermission perm = node.getFsPermission(snapshot); + boolean hasAcl = node.getAclFeature(snapshot) != null; + if (hasAcl || isEncrypted) { + perm = new FsPermissionExtension(perm, hasAcl, isEncrypted); + } + return perm; + } + + private static ContentSummary getContentSummaryInt( + FSDirectory fsd, String src) throws IOException { + String srcs = FSDirectory.normalizePath(src); + fsd.readLock(); + try { + INode targetNode = fsd.getNode(srcs, false); + if (targetNode == null) { + throw new FileNotFoundException("File does not exist: " + srcs); + } + else { + // Make it relinquish locks everytime contentCountLimit entries are + // processed. 0 means disabled. I.e. blocking for the entire duration. + ContentSummaryComputationContext cscc = + new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(), + fsd.getContentCountLimit()); + ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc); + fsd.addYieldCount(cscc.getYieldCount()); + return cs; + } + } finally { + fsd.readUnlock(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 2caaabdc75c..0c0b2af077f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -63,16 +63,11 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException; import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException; -import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotException; @@ -86,16 +81,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; -import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root; import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ChunkedArrayList; -import org.apache.hadoop.hdfs.util.ReadOnlyList; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -259,7 +252,7 @@ public class FSDirectory implements Closeable { ezManager = new EncryptionZoneManager(this, conf); } - private FSNamesystem getFSNamesystem() { + FSNamesystem getFSNamesystem() { return namesystem; } @@ -276,6 +269,14 @@ public class FSDirectory implements Closeable { return isPermissionEnabled; } + int getLsLimit() { + return lsLimit; + } + + int getContentCountLimit() { + return contentCountLimit; + } + FSEditLog getEditLog() { return editLog; } @@ -1343,172 +1344,12 @@ public class FSDirectory implements Closeable { return removed; } - private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { + byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy : parentPolicy; } - /** - * Get a partial listing of the indicated directory - * - * We will stop when any of the following conditions is met: - * 1) this.lsLimit files have been added - * 2) needLocation is true AND enough files have been added such - * that at least this.lsLimit block locations are in the response - * - * @param src the directory name - * @param startAfter the name to start listing after - * @param needLocation if block locations are returned - * @return a partial listing starting after startAfter - */ - DirectoryListing getListing(String src, byte[] startAfter, - boolean needLocation, boolean isSuperUser) - throws UnresolvedLinkException, IOException { - String srcs = normalizePath(src); - final boolean isRawPath = isReservedRawName(src); - - readLock(); - try { - if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) { - return getSnapshotsListing(srcs, startAfter); - } - final INodesInPath inodesInPath = getINodesInPath(srcs, true); - final INode[] inodes = inodesInPath.getINodes(); - final int snapshot = inodesInPath.getPathSnapshotId(); - final INode targetNode = inodes[inodes.length - 1]; - if (targetNode == null) - return null; - byte parentStoragePolicy = isSuperUser ? - targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; - - if (!targetNode.isDirectory()) { - return new DirectoryListing( - new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME, - targetNode, needLocation, parentStoragePolicy, snapshot, - isRawPath, inodesInPath)}, 0); - } - - final INodeDirectory dirInode = targetNode.asDirectory(); - final ReadOnlyList contents = dirInode.getChildrenList(snapshot); - int startChild = INodeDirectory.nextChild(contents, startAfter); - int totalNumChildren = contents.size(); - int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit); - int locationBudget = this.lsLimit; - int listingCnt = 0; - HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing]; - for (int i=0; i0; i++) { - INode cur = contents.get(startChild+i); - byte curPolicy = isSuperUser && !cur.isSymlink()? - cur.getLocalStoragePolicyID(): - BlockStoragePolicySuite.ID_UNSPECIFIED; - listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation, - getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, - isRawPath, inodesInPath); - listingCnt++; - if (needLocation) { - // Once we hit lsLimit locations, stop. - // This helps to prevent excessively large response payloads. - // Approximate #locations with locatedBlockCount() * repl_factor - LocatedBlocks blks = - ((HdfsLocatedFileStatus)listing[i]).getBlockLocations(); - locationBudget -= (blks == null) ? 0 : - blks.locatedBlockCount() * listing[i].getReplication(); - } - } - // truncate return array if necessary - if (listingCnt < numOfListing) { - listing = Arrays.copyOf(listing, listingCnt); - } - return new DirectoryListing( - listing, totalNumChildren-startChild-listingCnt); - } finally { - readUnlock(); - } - } - - /** - * Get a listing of all the snapshots of a snapshottable directory - */ - private DirectoryListing getSnapshotsListing(String src, byte[] startAfter) - throws UnresolvedLinkException, IOException { - Preconditions.checkState(hasReadLock()); - Preconditions.checkArgument( - src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), - "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR); - - final String dirPath = normalizePath(src.substring(0, - src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length())); - - final INode node = this.getINode(dirPath); - final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath); - final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature(); - if (sf == null) { - throw new SnapshotException( - "Directory is not a snapshottable directory: " + dirPath); - } - final ReadOnlyList snapshots = sf.getSnapshotList(); - int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter); - skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1; - int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit); - final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing]; - for (int i = 0; i < numOfListing; i++) { - Root sRoot = snapshots.get(i + skipSize).getRoot(); - listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot, - BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, - false, null); - } - return new DirectoryListing( - listing, snapshots.size() - skipSize - numOfListing); - } - - /** Get the file info for a specific file. - * @param src The string representation of the path to the file - * @param resolveLink whether to throw UnresolvedLinkException - * @param isRawPath true if a /.reserved/raw pathname was passed by the user - * @param includeStoragePolicy whether to include storage policy - * @return object containing information regarding the file - * or null if file not found - */ - HdfsFileStatus getFileInfo(String src, boolean resolveLink, - boolean isRawPath, boolean includeStoragePolicy) - throws IOException { - String srcs = normalizePath(src); - readLock(); - try { - if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) { - return getFileInfo4DotSnapshot(srcs); - } - final INodesInPath inodesInPath = getINodesInPath(srcs, resolveLink); - final INode[] inodes = inodesInPath.getINodes(); - final INode i = inodes[inodes.length - 1]; - byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ? - i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; - return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i, - policyId, inodesInPath.getPathSnapshotId(), isRawPath, - inodesInPath); - } finally { - readUnlock(); - } - } - - /** - * Currently we only support "ls /xxx/.snapshot" which will return all the - * snapshots of a directory. The FSCommand Ls will first call getFileInfo to - * make sure the file/directory exists (before the real getListing call). - * Since we do not have a real INode for ".snapshot", we return an empty - * non-null HdfsFileStatus here. - */ - private HdfsFileStatus getFileInfo4DotSnapshot(String src) - throws UnresolvedLinkException { - if (getINode4DotSnapshot(src) != null) { - return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, - HdfsFileStatus.EMPTY_NAME, -1L, 0, null, - BlockStoragePolicySuite.ID_UNSPECIFIED); - } - return null; - } - - private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException { + INode getINode4DotSnapshot(String src) throws UnresolvedLinkException { Preconditions.checkArgument( src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR); @@ -2090,36 +1931,15 @@ public class FSDirectory implements Closeable { return src; } - ContentSummary getContentSummary(String src) - throws FileNotFoundException, UnresolvedLinkException { - String srcs = normalizePath(src); - readLock(); - try { - INode targetNode = getNode(srcs, false); - if (targetNode == null) { - throw new FileNotFoundException("File does not exist: " + srcs); - } - else { - // Make it relinquish locks everytime contentCountLimit entries are - // processed. 0 means disabled. I.e. blocking for the entire duration. - ContentSummaryComputationContext cscc = - - new ContentSummaryComputationContext(this, getFSNamesystem(), - contentCountLimit); - ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc); - yieldCount += cscc.getYieldCount(); - return cs; - } - } finally { - readUnlock(); - } - } - @VisibleForTesting public long getYieldCount() { return yieldCount; } + void addYieldCount(long value) { + yieldCount += value; + } + public INodeMap getINodeMap() { return inodeMap; } @@ -2328,155 +2148,6 @@ public class FSDirectory implements Closeable { } } - /** - * create an hdfs file status from an inode - * - * @param path the local name - * @param node inode - * @param needLocation if block locations need to be included or not - * @param isRawPath true if this is being called on behalf of a path in - * /.reserved/raw - * @param iip - * @return a file status - * @throws IOException if any error occurs - */ - private HdfsFileStatus createFileStatus(byte[] path, INode node, - boolean needLocation, byte storagePolicy, int snapshot, - boolean isRawPath, INodesInPath iip) - throws IOException { - if (needLocation) { - return createLocatedFileStatus(path, node, storagePolicy, snapshot, - isRawPath, iip); - } else { - return createFileStatus(path, node, storagePolicy, snapshot, - isRawPath, iip); - } - } - - /** - * Create FileStatus by file INode - */ - HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy, - int snapshot, boolean isRawPath, INodesInPath iip) throws IOException { - long size = 0; // length is zero for directories - short replication = 0; - long blocksize = 0; - final boolean isEncrypted; - - final FileEncryptionInfo feInfo = isRawPath ? null : - getFileEncryptionInfo(node, snapshot, iip); - - boolean isLazyPersist = false; - if (node.isFile()) { - final INodeFile fileNode = node.asFile(); - size = fileNode.computeFileSize(snapshot); - replication = fileNode.getFileReplication(snapshot); - blocksize = fileNode.getPreferredBlockSize(); - isEncrypted = (feInfo != null) || - (isRawPath && isInAnEZ(INodesInPath.fromINode(node))); - } else { - isEncrypted = isInAnEZ(INodesInPath.fromINode(node)); - } - - int childrenNum = node.isDirectory() ? - node.asDirectory().getChildrenNum(snapshot) : 0; - - return new HdfsFileStatus( - size, - node.isDirectory(), - replication, - blocksize, - node.getModificationTime(snapshot), - node.getAccessTime(snapshot), - getPermissionForFileStatus(node, snapshot, isEncrypted), - node.getUserName(snapshot), - node.getGroupName(snapshot), - node.isSymlink() ? node.asSymlink().getSymlink() : null, - path, - node.getId(), - childrenNum, - feInfo, - storagePolicy); - } - - /** - * Create FileStatus with location info by file INode - */ - private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node, - byte storagePolicy, int snapshot, boolean isRawPath, - INodesInPath iip) throws IOException { - assert hasReadLock(); - long size = 0; // length is zero for directories - short replication = 0; - long blocksize = 0; - LocatedBlocks loc = null; - final boolean isEncrypted; - final FileEncryptionInfo feInfo = isRawPath ? null : - getFileEncryptionInfo(node, snapshot, iip); - if (node.isFile()) { - final INodeFile fileNode = node.asFile(); - size = fileNode.computeFileSize(snapshot); - replication = fileNode.getFileReplication(snapshot); - blocksize = fileNode.getPreferredBlockSize(); - - final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID; - final boolean isUc = !inSnapshot && fileNode.isUnderConstruction(); - final long fileSize = !inSnapshot && isUc ? - fileNode.computeFileSizeNotIncludingLastUcBlock() : size; - - loc = getFSNamesystem().getBlockManager().createLocatedBlocks( - fileNode.getBlocks(), fileSize, isUc, 0L, size, false, - inSnapshot, feInfo); - if (loc == null) { - loc = new LocatedBlocks(); - } - isEncrypted = (feInfo != null) || - (isRawPath && isInAnEZ(INodesInPath.fromINode(node))); - } else { - isEncrypted = isInAnEZ(INodesInPath.fromINode(node)); - } - int childrenNum = node.isDirectory() ? - node.asDirectory().getChildrenNum(snapshot) : 0; - - HdfsLocatedFileStatus status = - new HdfsLocatedFileStatus(size, node.isDirectory(), replication, - blocksize, node.getModificationTime(snapshot), - node.getAccessTime(snapshot), - getPermissionForFileStatus(node, snapshot, isEncrypted), - node.getUserName(snapshot), node.getGroupName(snapshot), - node.isSymlink() ? node.asSymlink().getSymlink() : null, path, - node.getId(), loc, childrenNum, feInfo, storagePolicy); - // Set caching information for the located blocks. - if (loc != null) { - CacheManager cacheManager = namesystem.getCacheManager(); - for (LocatedBlock lb: loc.getLocatedBlocks()) { - cacheManager.setCachedLocations(lb); - } - } - return status; - } - - /** - * Returns an inode's FsPermission for use in an outbound FileStatus. If the - * inode has an ACL or is for an encrypted file/dir, then this method will - * return an FsPermissionExtension. - * - * @param node INode to check - * @param snapshot int snapshot ID - * @param isEncrypted boolean true if the file/dir is encrypted - * @return FsPermission from inode, with ACL bit on if the inode has an ACL - * and encrypted bit on if it represents an encrypted file/dir. - */ - private static FsPermission getPermissionForFileStatus(INode node, - int snapshot, boolean isEncrypted) { - FsPermission perm = node.getFsPermission(snapshot); - boolean hasAcl = node.getAclFeature(snapshot) != null; - if (hasAcl || isEncrypted) { - perm = new FsPermissionExtension(perm, hasAcl, isEncrypted); - } - return perm; - } - /** * Add the specified path into the namespace. */ @@ -3324,6 +2995,7 @@ public class FSDirectory implements Closeable { HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink) throws IOException { return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation()) - ? getFileInfo(path, resolveSymlink, false, false) : null; + ? FSDirStatAndListingOp.getFileInfo(this, path, resolveSymlink, false, + false) : null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index b22b156f51f..9e557b31aff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -373,8 +373,8 @@ public class FSEditLogLoader { // add the op into retry cache if necessary if (toAddRetryCache) { - HdfsFileStatus stat = fsNamesys.dir.createFileStatus( - HdfsFileStatus.EMPTY_NAME, newFile, + HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus( + fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile, BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, iip); fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, @@ -394,7 +394,8 @@ public class FSEditLogLoader { // add the op into retry cache is necessary if (toAddRetryCache) { - HdfsFileStatus stat = fsNamesys.dir.createFileStatus( + HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus( + fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, newFile, BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, iip); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e877fc282fe..85f5ce0a91a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -146,7 +146,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; @@ -2468,7 +2467,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, toRemoveBlocks = startFileInternal(pc, src, permissions, holder, clientMachine, create, overwrite, createParent, replication, blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache); - stat = dir.getFileInfo(src, false, + stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, FSDirectory.isReservedRawName(srcArg), true); } catch (StandbyException se) { skipSync = true; @@ -2923,8 +2922,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, checkNameNodeSafeMode("Cannot append to file" + src); src = dir.resolvePath(pc, src, pathComponents); lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache); - stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg), - true); + stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, + FSDirectory.isReservedRawName(srcArg), true); } catch (StandbyException se) { skipSync = true; throw se; @@ -3921,7 +3920,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Get the file info for a specific file. * * @param srcArg The string representation of the path to the file - * @param resolveLink whether to throw UnresolvedLinkException + * @param resolveLink whether to throw UnresolvedLinkException * if src refers to a symlink * * @throws AccessControlException if access is denied @@ -3929,63 +3928,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * * @return object containing information regarding the file * or null if file not found - * @throws StandbyException + * @throws StandbyException */ - HdfsFileStatus getFileInfo(final String srcArg, boolean resolveLink) - throws AccessControlException, UnresolvedLinkException, - StandbyException, IOException { - String src = srcArg; - if (!DFSUtil.isValidName(src)) { - throw new InvalidPathException("Invalid file name: " + src); - } - HdfsFileStatus stat = null; - FSPermissionChecker pc = getPermissionChecker(); + HdfsFileStatus getFileInfo(final String src, boolean resolveLink) + throws IOException { checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + HdfsFileStatus stat = null; readLock(); try { checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - boolean isSuperUser = true; - if (isPermissionEnabled) { - checkPermission(pc, src, false, null, null, null, null, false, - resolveLink); - isSuperUser = pc.isSuperUser(); - } - stat = dir.getFileInfo(src, resolveLink, - FSDirectory.isReservedRawName(srcArg), isSuperUser); + stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink); } catch (AccessControlException e) { - logAuditEvent(false, "getfileinfo", srcArg); + logAuditEvent(false, "getfileinfo", src); throw e; } finally { readUnlock(); } - logAuditEvent(true, "getfileinfo", srcArg); + logAuditEvent(true, "getfileinfo", src); return stat; } - + /** * Returns true if the file is closed */ - boolean isFileClosed(final String srcArg) - throws AccessControlException, UnresolvedLinkException, - StandbyException, IOException { - String src = srcArg; - FSPermissionChecker pc = getPermissionChecker(); + boolean isFileClosed(final String src) throws IOException { checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); readLock(); try { - src = dir.resolvePath(pc, src, pathComponents); checkOperation(OperationCategory.READ); - if (isPermissionEnabled) { - checkTraverse(pc, src); - } - return !INodeFile.valueOf(dir.getINode(src), src).isUnderConstruction(); + return FSDirStatAndListingOp.isFileClosed(dir, src); } catch (AccessControlException e) { - if (isAuditEnabled() && isExternalInvocation()) { - logAuditEvent(false, "isFileClosed", srcArg); - } + logAuditEvent(false, "isFileClosed", src); throw e; } finally { readUnlock(); @@ -4182,7 +4155,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** * Get the content summary for a specific file/dir. * - * @param srcArg The string representation of the path to the file + * @param src The string representation of the path to the file * * @throws AccessControlException if access is denied * @throws UnresolvedLinkException if a symlink is encountered. @@ -4193,27 +4166,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * @return object containing information regarding the file * or null if file not found */ - ContentSummary getContentSummary(final String srcArg) throws IOException { - String src = srcArg; - FSPermissionChecker pc = getPermissionChecker(); - checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + ContentSummary getContentSummary(final String src) throws IOException { readLock(); boolean success = true; try { - checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - if (isPermissionEnabled) { - checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE); - } - return dir.getContentSummary(src); - + return FSDirStatAndListingOp.getContentSummary(dir, src); } catch (AccessControlException ace) { success = false; throw ace; } finally { readUnlock(); - logAuditEvent(success, "contentSummary", srcArg); + logAuditEvent(success, "contentSummary", src); } } @@ -4722,58 +4685,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) - throws AccessControlException, UnresolvedLinkException, IOException { + throws IOException { + checkOperation(OperationCategory.READ); + DirectoryListing dl = null; + readLock(); try { - return getListingInt(src, startAfter, needLocation); + checkOperation(NameNode.OperationCategory.READ); + dl = FSDirStatAndListingOp.getListingInt(dir, src, startAfter, + needLocation); } catch (AccessControlException e) { logAuditEvent(false, "listStatus", src); throw e; - } - } - - private DirectoryListing getListingInt(final String srcArg, byte[] startAfter, - boolean needLocation) - throws AccessControlException, UnresolvedLinkException, IOException { - String src = srcArg; - DirectoryListing dl; - FSPermissionChecker pc = getPermissionChecker(); - checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - String startAfterString = new String(startAfter); - readLock(); - try { - checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - - // Get file name when startAfter is an INodePath - if (FSDirectory.isReservedName(startAfterString)) { - byte[][] startAfterComponents = FSDirectory - .getPathComponentsForReservedPath(startAfterString); - try { - String tmp = FSDirectory.resolvePath(src, startAfterComponents, dir); - byte[][] regularPath = INode.getPathComponents(tmp); - startAfter = regularPath[regularPath.length - 1]; - } catch (IOException e) { - // Possibly the inode is deleted - throw new DirectoryListingStartAfterNotFoundException( - "Can't find startAfter " + startAfterString); - } - } - - boolean isSuperUser = true; - if (isPermissionEnabled) { - if (dir.isDir(src)) { - checkPathAccess(pc, src, FsAction.READ_EXECUTE); - } else { - checkTraverse(pc, src); - } - isSuperUser = pc.isSuperUser(); - } - logAuditEvent(true, "listStatus", srcArg); - dl = dir.getListing(src, startAfter, needLocation, isSuperUser); } finally { readUnlock(); } + logAuditEvent(true, "listStatus", src); return dl; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index c32ed67d6e4..1a42e284d8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -70,7 +70,8 @@ public class NameNodeAdapter { public static HdfsFileStatus getFileInfo(NameNode namenode, String src, boolean resolveLink) throws AccessControlException, UnresolvedLinkException, StandbyException, IOException { - return namenode.getNamesystem().getFileInfo(src, resolveLink); + return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem() + .getFSDirectory(), src, resolveLink); } public static boolean mkdirs(NameNode namenode, String src,