HDFS-8446. Separate safemode related operations in GetBlockLocations(). Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-06-17 16:21:37 -07:00
parent 16d2412a25
commit 38a16e1a45
7 changed files with 140 additions and 152 deletions

View File

@ -300,6 +300,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8238. Move ClientProtocol to the hdfs-client. HDFS-8238. Move ClientProtocol to the hdfs-client.
(Takanobu Asanuma via wheat9) (Takanobu Asanuma via wheat9)
HDFS-8446. Separate safemode related operations in GetBlockLocations().
(wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -474,8 +474,7 @@ private static boolean unprotectedSetTimes(
// if the last access time update was within the last precision interval, then // if the last access time update was within the last precision interval, then
// no need to store access time // no need to store access time
if (atime <= inodeTime + fsd.getFSNamesystem().getAccessTimePrecision() if (atime <= inodeTime + fsd.getAccessTimePrecision() && !force) {
&& !force) {
status = false; status = false;
} else { } else {
inode.setAccessTime(atime, latest); inode.setAccessTime(atime, latest);

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -35,6 +36,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.hdfs.util.ReadOnlyList;
@ -43,6 +45,8 @@
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import static org.apache.hadoop.util.Time.now;
class FSDirStatAndListingOp { class FSDirStatAndListingOp {
static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg, static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
byte[] startAfter, boolean needLocation) throws IOException { byte[] startAfter, boolean needLocation) throws IOException {
@ -137,9 +141,69 @@ static ContentSummary getContentSummary(
return getContentSummaryInt(fsd, iip); return getContentSummaryInt(fsd, iip);
} }
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
* @throws IOException
*/
static GetBlockLocationsResult getBlockLocations(
FSDirectory fsd, FSPermissionChecker pc, String src, long offset,
long length, boolean needBlockToken) throws IOException {
Preconditions.checkArgument(offset >= 0,
"Negative offset is not supported. File: " + src);
Preconditions.checkArgument(length >= 0,
"Negative length is not supported. File: " + src);
CacheManager cm = fsd.getFSNamesystem().getCacheManager();
BlockManager bm = fsd.getBlockManager();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean isReservedName = FSDirectory.isReservedRawName(src);
fsd.readLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath(src, true);
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ);
fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
final long fileSize = iip.isSnapshot()
? inode.computeFileSize(iip.getPathSnapshotId())
: inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
// if src indicates a snapshot file, we need to make sure the returned
// blocks do not exceed the size of the snapshot file.
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo = isReservedName ? null
: fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = bm.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
cm.setCachedLocations(lb);
}
final long now = now();
boolean updateAccessTime = fsd.isAccessTimeSupported()
&& !iip.isSnapshot()
&& now > inode.getAccessTime() + fsd.getAccessTimePrecision();
return new GetBlockLocationsResult(updateAccessTime, blocks);
} finally {
fsd.readUnlock();
}
}
private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED ? inodePolicy : return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
parentPolicy; ? inodePolicy : parentPolicy;
} }
/** /**
@ -294,13 +358,11 @@ static HdfsFileStatus getFileInfo(
byte policyId = includeStoragePolicy && !i.isSymlink() ? byte policyId = includeStoragePolicy && !i.isSymlink() ?
i.getStoragePolicyID() : i.getStoragePolicyID() :
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
INodeAttributes nodeAttrs = getINodeAttributes( INodeAttributes nodeAttrs = getINodeAttributes(fsd, path,
fsd, path, HdfsFileStatus.EMPTY_NAME, i, src.getPathSnapshotId()); HdfsFileStatus.EMPTY_NAME,
return createFileStatus( i, src.getPathSnapshotId());
fsd, HdfsFileStatus.EMPTY_NAME, return createFileStatus(fsd, HdfsFileStatus.EMPTY_NAME, i, nodeAttrs,
i, nodeAttrs, policyId, policyId, src.getPathSnapshotId(), isRawPath, src);
src.getPathSnapshotId(),
isRawPath, src);
} finally { } finally {
fsd.readUnlock(); fsd.readUnlock();
} }
@ -520,4 +582,17 @@ private static ContentSummary getContentSummaryInt(FSDirectory fsd,
fsd.readUnlock(); fsd.readUnlock();
} }
} }
static class GetBlockLocationsResult {
final boolean updateAccessTime;
final LocatedBlocks blocks;
boolean updateAccessTime() {
return updateAccessTime;
}
private GetBlockLocationsResult(
boolean updateAccessTime, LocatedBlocks blocks) {
this.updateAccessTime = updateAccessTime;
this.blocks = blocks;
}
}
} }

View File

@ -80,6 +80,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
/** /**
@ -92,6 +93,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class FSDirectory implements Closeable { public class FSDirectory implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class); static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
private static INodeDirectory createRoot(FSNamesystem namesystem) { private static INodeDirectory createRoot(FSNamesystem namesystem) {
final INodeDirectory r = new INodeDirectory( final INodeDirectory r = new INodeDirectory(
INodeId.ROOT_INODE_ID, INodeId.ROOT_INODE_ID,
@ -328,6 +330,9 @@ boolean isStoragePolicyEnabled() {
boolean isAccessTimeSupported() { boolean isAccessTimeSupported() {
return accessTimePrecision > 0; return accessTimePrecision > 0;
} }
long getAccessTimePrecision() {
return accessTimePrecision;
}
boolean isQuotaByStorageTypeEnabled() { boolean isQuotaByStorageTypeEnabled() {
return quotaByStorageTypeEnabled; return quotaByStorageTypeEnabled;
} }
@ -1550,6 +1555,21 @@ void checkPermission(FSPermissionChecker pc, INodesInPath iip,
} }
} }
void checkUnreadableBySuperuser(
FSPermissionChecker pc, INode inode, int snapshotId)
throws IOException {
if (pc.isSuperUser()) {
for (XAttr xattr : FSDirXAttrOp.getXAttrs(this, inode, snapshotId)) {
if (XAttrHelper.getPrefixName(xattr).
equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
throw new AccessControlException(
"Access is denied for " + pc.getUser() + " since the superuser "
+ "is not allowed to perform this operation.");
}
}
}
}
HdfsFileStatus getAuditFileInfo(INodesInPath iip) HdfsFileStatus getAuditFileInfo(INodesInPath iip)
throws IOException { throws IOException {
return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation()) return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())

View File

@ -34,8 +34,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
@ -90,6 +88,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
@ -172,7 +171,6 @@
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException; import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -472,9 +470,6 @@ private void logAuditEvent(boolean succeeded,
private final long minBlockSize; // minimum block size private final long minBlockSize; // minimum block size
final long maxBlocksPerFile; // maximum # of blocks per file final long maxBlocksPerFile; // maximum # of blocks per file
// precision of access times.
private final long accessTimePrecision;
/** Lock to protect FSNamesystem. */ /** Lock to protect FSNamesystem. */
private final FSNamesystemLock fsLock; private final FSNamesystemLock fsLock;
@ -788,8 +783,6 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT); this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
LOG.info("Append Enabled: " + supportAppends); LOG.info("Append Enabled: " + supportAppends);
@ -1621,14 +1614,6 @@ FsServerDefaults getServerDefaults() throws StandbyException {
return serverDefaults; return serverDefaults;
} }
long getAccessTimePrecision() {
return accessTimePrecision;
}
private boolean isAccessTimeSupported() {
return accessTimePrecision > 0;
}
///////////////////////////////////////////////////////// /////////////////////////////////////////////////////////
// //
// These methods are called by HadoopFS clients // These methods are called by HadoopFS clients
@ -1679,19 +1664,6 @@ void setOwner(String src, String username, String group)
logAuditEvent(true, "setOwner", src, null, auditStat); logAuditEvent(true, "setOwner", src, null, auditStat);
} }
static class GetBlockLocationsResult {
final boolean updateAccessTime;
final LocatedBlocks blocks;
boolean updateAccessTime() {
return updateAccessTime;
}
private GetBlockLocationsResult(
boolean updateAccessTime, LocatedBlocks blocks) {
this.updateAccessTime = updateAccessTime;
this.blocks = blocks;
}
}
/** /**
* Get block locations within the specified range. * Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long) * @see ClientProtocol#getBlockLocations(String, long, long)
@ -1704,7 +1676,23 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
res = getBlockLocations(pc, srcArg, offset, length, true, true); res = FSDirStatAndListingOp.getBlockLocations(
dir, pc, srcArg, offset, length, true);
if (isInSafeMode()) {
for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = newSafemodeException(
"Zero blocklocations for " + srcArg);
if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
}
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "open", srcArg); logAuditEvent(false, "open", srcArg);
throw e; throw e;
@ -1714,7 +1702,7 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
logAuditEvent(true, "open", srcArg); logAuditEvent(true, "open", srcArg);
if (res.updateAccessTime()) { if (!isInSafeMode() && res.updateAccessTime()) {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath( byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
srcArg); srcArg);
String src = srcArg; String src = srcArg;
@ -1744,7 +1732,7 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
final INodesInPath iip = dir.getINodesInPath(src, true); final INodesInPath iip = dir.getINodesInPath(src, true);
INode inode = iip.getLastINode(); INode inode = iip.getLastINode();
boolean updateAccessTime = inode != null && boolean updateAccessTime = inode != null &&
now > inode.getAccessTime() + getAccessTimePrecision(); now > inode.getAccessTime() + dir.getAccessTimePrecision();
if (!isInSafeMode() && updateAccessTime) { if (!isInSafeMode() && updateAccessTime) {
boolean changed = FSDirAttrOp.setTimes(dir, boolean changed = FSDirAttrOp.setTimes(dir,
inode, -1, now, false, iip.getLatestSnapshotId()); inode, -1, now, false, iip.getLatestSnapshotId());
@ -1775,88 +1763,6 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
return blocks; return blocks;
} }
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
* @throws IOException
*/
GetBlockLocationsResult getBlockLocations(
FSPermissionChecker pc, String src, long offset, long length,
boolean needBlockToken, boolean checkSafeMode) throws IOException {
if (offset < 0) {
throw new HadoopIllegalArgumentException(
"Negative offset is not supported. File: " + src);
}
if (length < 0) {
throw new HadoopIllegalArgumentException(
"Negative length is not supported. File: " + src);
}
final GetBlockLocationsResult ret = getBlockLocationsInt(
pc, src, offset, length, needBlockToken);
if (checkSafeMode && isInSafeMode()) {
for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = newSafemodeException(
"Zero blocklocations for " + src);
if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
}
return ret;
}
private GetBlockLocationsResult getBlockLocationsInt(
FSPermissionChecker pc, final String srcArg, long offset, long length,
boolean needBlockToken)
throws IOException {
String src = srcArg;
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = dir.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
final long fileSize = iip.isSnapshot()
? inode.computeFileSize(iip.getPathSnapshotId())
: inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
// if src indicates a snapshot file, we need to make sure the returned
// blocks do not exceed the size of the snapshot file.
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ? null
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
cacheManager.setCachedLocations(lb);
}
final long now = now();
boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
&& !iip.isSnapshot()
&& now > inode.getAccessTime() + getAccessTimePrecision();
return new GetBlockLocationsResult(updateAccessTime, blocks);
}
/** /**
* Moves all the blocks from {@code srcs} and appends them to {@code target} * Moves all the blocks from {@code srcs} and appends them to {@code target}
* To avoid rollbacks we will verify validity of ALL of the args * To avoid rollbacks we will verify validity of ALL of the args
@ -3909,8 +3815,7 @@ DirectoryListing getListing(String src, byte[] startAfter,
readLock(); readLock();
try { try {
checkOperation(NameNode.OperationCategory.READ); checkOperation(NameNode.OperationCategory.READ);
dl = FSDirStatAndListingOp.getListingInt(dir, src, startAfter, dl = getListingInt(dir, src, startAfter, needLocation);
needLocation);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "listStatus", src); logAuditEvent(false, "listStatus", src);
throw e; throw e;
@ -5303,21 +5208,6 @@ PermissionStatus createFsOwnerPermissions(FsPermission permission) {
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission); return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
} }
private void checkUnreadableBySuperuser(FSPermissionChecker pc,
INode inode, int snapshotId)
throws IOException {
if (pc.isSuperUser()) {
for (XAttr xattr : FSDirXAttrOp.getXAttrs(dir, inode, snapshotId)) {
if (XAttrHelper.getPrefixName(xattr).
equals(SECURITY_XATTR_UNREADABLE_BY_SUPERUSER)) {
throw new AccessControlException("Access is denied for " +
pc.getUser() + " since the superuser is not allowed to " +
"perform this operation.");
}
}
}
}
@Override @Override
public void checkSuperuserPrivilege() public void checkSuperuserPrivilege()
throws AccessControlException { throws AccessControlException {

View File

@ -481,8 +481,9 @@ private LocatedBlocks getBlockLocations(String path, HdfsFileStatus file)
FSNamesystem fsn = namenode.getNamesystem(); FSNamesystem fsn = namenode.getNamesystem();
fsn.readLock(); fsn.readLock();
try { try {
blocks = fsn.getBlockLocations( blocks = FSDirStatAndListingOp.getBlockLocations(
fsn.getPermissionChecker(), path, 0, fileLen, false, false) fsn.getFSDirectory(), fsn.getPermissionChecker(),
path, 0, fileLen, false)
.blocks; .blocks;
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
blocks = null; blocks = null;

View File

@ -1148,20 +1148,21 @@ public void testFsckFileNotFound() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
NameNode namenode = mock(NameNode.class); NameNode namenode = mock(NameNode.class);
NetworkTopology nettop = mock(NetworkTopology.class); NetworkTopology nettop = mock(NetworkTopology.class);
Map<String,String[]> pmap = new HashMap<String, String[]>(); Map<String,String[]> pmap = new HashMap<>();
Writer result = new StringWriter(); Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true); PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost(); InetAddress remoteAddress = InetAddress.getLocalHost();
FSNamesystem fsName = mock(FSNamesystem.class); FSNamesystem fsName = mock(FSNamesystem.class);
FSDirectory fsd = mock(FSDirectory.class);
BlockManager blockManager = mock(BlockManager.class); BlockManager blockManager = mock(BlockManager.class);
DatanodeManager dnManager = mock(DatanodeManager.class); DatanodeManager dnManager = mock(DatanodeManager.class);
INodesInPath iip = mock(INodesInPath.class);
when(namenode.getNamesystem()).thenReturn(fsName); when(namenode.getNamesystem()).thenReturn(fsName);
when(fsName.getBlockLocations(any(FSPermissionChecker.class), anyString(),
anyLong(), anyLong(),
anyBoolean(), anyBoolean()))
.thenThrow(new FileNotFoundException());
when(fsName.getBlockManager()).thenReturn(blockManager); when(fsName.getBlockManager()).thenReturn(blockManager);
when(fsName.getFSDirectory()).thenReturn(fsd);
when(fsd.getFSNamesystem()).thenReturn(fsName);
when(fsd.getINodesInPath(anyString(), anyBoolean())).thenReturn(iip);
when(blockManager.getDatanodeManager()).thenReturn(dnManager); when(blockManager.getDatanodeManager()).thenReturn(dnManager);
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
@ -1179,8 +1180,7 @@ public void testFsckFileNotFound() throws Exception {
String owner = "foo"; String owner = "foo";
String group = "bar"; String group = "bar";
byte [] symlink = null; byte [] symlink = null;
byte [] path = new byte[128]; byte [] path = DFSUtil.string2Bytes(pathString);
path = DFSUtil.string2Bytes(pathString);
long fileId = 312321L; long fileId = 312321L;
int numChildren = 1; int numChildren = 1;
byte storagePolicy = 0; byte storagePolicy = 0;
@ -1193,7 +1193,7 @@ public void testFsckFileNotFound() throws Exception {
try { try {
fsck.check(pathString, file, res); fsck.check(pathString, file, res);
} catch (Exception e) { } catch (Exception e) {
fail("Unexpected exception "+ e.getMessage()); fail("Unexpected exception " + e.getMessage());
} }
assertTrue(res.toString().contains("HEALTHY")); assertTrue(res.toString().contains("HEALTHY"));
} }