HDFS-5614. NameNode: implement handling of ACLs in combination with snapshots. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1563304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-01-31 22:21:19 +00:00
parent d5f4f76a23
commit 14f1f76bf6
21 changed files with 989 additions and 109 deletions

View File

@ -50,6 +50,9 @@ HDFS-4685 (Unreleased)
HDFS-5608. WebHDFS: implement ACL APIs.
(Sachin Jose and Renil Joseph via cnauroth)
HDFS-5614. NameNode: implement handling of ACLs in combination with
snapshots. (cnauroth)
OPTIMIZATIONS
BUG FIXES

View File

@ -2651,6 +2651,7 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
@ -2666,6 +2667,7 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
@ -2680,6 +2682,7 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
@ -2694,6 +2697,7 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
@ -2708,6 +2712,7 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);

View File

@ -31,13 +31,13 @@ import org.apache.hadoop.fs.permission.AclEntry;
public class AclFeature implements INode.Feature {
public static final List<AclEntry> EMPTY_ENTRY_LIST = Collections.emptyList();
private List<AclEntry> entries;
private final List<AclEntry> entries;
public AclFeature(List<AclEntry> entries) {
this.entries = entries;
}
public List<AclEntry> getEntries() {
return entries;
}
public void setEntries(List<AclEntry> entries) {
this.entries = entries;
}
}

View File

@ -62,17 +62,17 @@ final class AclStorage {
/**
* Reads the existing extended ACL entries of an inode. This method returns
* only the extended ACL entries stored in the AclFeature. If the inode does
* not have an ACL, then this method returns an empty list.
* not have an ACL, then this method returns an empty list. This method
* supports querying by snapshot ID.
*
* @param inode INodeWithAdditionalFields to read
* @param snapshotId int latest snapshot ID of inode
* @param inode INode to read
* @param snapshotId int ID of snapshot to read
* @return List<AclEntry> containing extended inode ACL entries
*/
public static List<AclEntry> readINodeAcl(INodeWithAdditionalFields inode,
int snapshotId) {
FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
public static List<AclEntry> readINodeAcl(INode inode, int snapshotId) {
FsPermission perm = inode.getFsPermission(snapshotId);
if (perm.getAclBit()) {
return inode.getAclFeature().getEntries();
return inode.getAclFeature(snapshotId).getEntries();
} else {
return Collections.emptyList();
}
@ -85,15 +85,16 @@ final class AclStorage {
* logically has an ACL, even if no ACL has been set explicitly. If the inode
* does not have an extended ACL, then the result is a minimal ACL consising of
* exactly 3 entries that correspond to the owner, group and other permissions.
* This method always reads the inode's current state and does not support
* querying by snapshot ID. This is because the method is intended to support
* ACL modification APIs, which always apply a delta on top of current state.
*
* @param inode INodeWithAdditionalFields to read
* @param snapshotId int latest snapshot ID of inode
* @param inode INode to read
* @return List<AclEntry> containing all logical inode ACL entries
*/
public static List<AclEntry> readINodeLogicalAcl(
INodeWithAdditionalFields inode, int snapshotId) {
public static List<AclEntry> readINodeLogicalAcl(INode inode) {
final List<AclEntry> existingAcl;
FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
FsPermission perm = inode.getFsPermission();
if (perm.getAclBit()) {
// Split ACL entries stored in the feature into access vs. default.
List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
@ -150,13 +151,13 @@ final class AclStorage {
/**
* Completely removes the ACL from an inode.
*
* @param inode INodeWithAdditionalFields to update
* @param inode INode to update
* @param snapshotId int latest snapshot ID of inode
* @throws QuotaExceededException if quota limit is exceeded
*/
public static void removeINodeAcl(INodeWithAdditionalFields inode,
int snapshotId) throws QuotaExceededException {
FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
public static void removeINodeAcl(INode inode, int snapshotId)
throws QuotaExceededException {
FsPermission perm = inode.getFsPermission();
if (perm.getAclBit()) {
List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
final FsAction groupPerm;
@ -176,7 +177,7 @@ final class AclStorage {
}
// Remove the feature and turn off the ACL bit.
inode.removeAclFeature();
inode.removeAclFeature(snapshotId);
FsPermission newPerm = new FsPermission(perm.getUserAction(),
groupPerm, perm.getOtherAction(),
perm.getStickyBit(), false);
@ -189,17 +190,16 @@ final class AclStorage {
* stores the entries to the inode's {@link FsPermission} and
* {@link AclFeature}.
*
* @param inode INodeWithAdditionalFields to update
* @param inode INode to update
* @param newAcl List<AclEntry> containing new ACL entries
* @param snapshotId int latest snapshot ID of inode
* @throws AclException if the ACL is invalid for the given inode
* @throws QuotaExceededException if quota limit is exceeded
*/
public static void updateINodeAcl(INodeWithAdditionalFields inode,
List<AclEntry> newAcl, int snapshotId) throws AclException,
QuotaExceededException {
public static void updateINodeAcl(INode inode, List<AclEntry> newAcl,
int snapshotId) throws AclException, QuotaExceededException {
assert newAcl.size() >= 3;
FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
FsPermission perm = inode.getFsPermission();
final FsPermission newPerm;
if (newAcl.size() > 3) {
// This is an extended ACL. Split entries into access vs. default.
@ -238,17 +238,15 @@ final class AclStorage {
// Add all default entries to the feature.
featureEntries.addAll(defaultEntries);
// Attach entries to the feature, creating a new feature if needed.
AclFeature aclFeature = inode.getAclFeature();
if (aclFeature == null) {
aclFeature = new AclFeature();
inode.addAclFeature(aclFeature);
// Attach entries to the feature.
if (perm.getAclBit()) {
inode.removeAclFeature(snapshotId);
}
aclFeature.setEntries(featureEntries);
inode.addAclFeature(new AclFeature(featureEntries), snapshotId);
} else {
// This is a minimal ACL. Remove the ACL feature if it previously had one.
if (perm.getAclBit()) {
inode.removeAclFeature();
inode.removeAclFeature(snapshotId);
}
// Calculate new permission bits. For a correctly sorted ACL, the owner,

View File

@ -1629,6 +1629,14 @@ public class FSDirectory implements Closeable {
*/
private HdfsFileStatus getFileInfo4DotSnapshot(String src)
throws UnresolvedLinkException {
if (getINode4DotSnapshot(src) != null) {
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
HdfsFileStatus.EMPTY_NAME, -1L, 0);
}
return null;
}
private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
Preconditions.checkArgument(
src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
"%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
@ -1640,8 +1648,7 @@ public class FSDirectory implements Closeable {
if (node != null
&& node.isDirectory()
&& node.asDirectory() instanceof INodeDirectorySnapshottable) {
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
HdfsFileStatus.EMPTY_NAME, -1L, 0);
return node;
}
return null;
}
@ -2697,10 +2704,9 @@ public class FSDirectory implements Closeable {
List<AclEntry> aclSpec) throws IOException {
assert hasWriteLock();
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
snapshotId);
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@ -2721,10 +2727,9 @@ public class FSDirectory implements Closeable {
List<AclEntry> aclSpec) throws IOException {
assert hasWriteLock();
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
snapshotId);
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
existingAcl, aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@ -2745,10 +2750,9 @@ public class FSDirectory implements Closeable {
throws IOException {
assert hasWriteLock();
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
snapshotId);
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
existingAcl);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@ -2768,7 +2772,7 @@ public class FSDirectory implements Closeable {
private void unprotectedRemoveAcl(String src) throws IOException {
assert hasWriteLock();
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
AclStorage.removeINodeAcl(inode, snapshotId);
}
@ -2793,10 +2797,9 @@ public class FSDirectory implements Closeable {
assert hasWriteLock();
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
snapshotId);
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@ -2804,12 +2807,18 @@ public class FSDirectory implements Closeable {
}
AclStatus getAclStatus(String src) throws IOException {
String srcs = normalizePath(src);
readLock();
try {
INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
final INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(
src, iip);
int snapshotId = iip.getLatestSnapshotId();
// There is no real inode for the path ending in ".snapshot", so return a
// non-null, unpopulated AclStatus. This is similar to getFileInfo.
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
getINode4DotSnapshot(srcs) != null) {
return new AclStatus.Builder().owner("").group("").build();
}
INodesInPath iip = rootDir.getLastINodeInPath(srcs, true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getPathSnapshotId();
List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
return new AclStatus.Builder()
.owner(inode.getUserName()).group(inode.getGroupName())
@ -2820,12 +2829,12 @@ public class FSDirectory implements Closeable {
}
}
private static INodeWithAdditionalFields resolveINodeWithAdditionalFields(
String src, INodesInPath iip) throws FileNotFoundException {
private static INode resolveLastINode(String src, INodesInPath iip)
throws FileNotFoundException {
INode inode = iip.getLastINode();
if (!(inode instanceof INodeWithAdditionalFields))
if (inode == null)
throw new FileNotFoundException("cannot find " + src);
return (INodeWithAdditionalFields)inode;
return inode;
}
/**

View File

@ -712,10 +712,12 @@ public class FSImageFormat {
file.toUnderConstruction(clientName, clientMachine, null);
}
if (permissions.getPermission().getAclBit()) {
AclFeature aclFeature = loadAclFeature(in, imgVersion);
if (aclFeature != null) {
file.addAclFeature(aclFeature);
}
}
return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
@ -751,10 +753,12 @@ public class FSImageFormat {
dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
}
if (permissions.getPermission().getAclBit()) {
AclFeature aclFeature = loadAclFeature(in, imgVersion);
if (aclFeature != null) {
dir.addAclFeature(aclFeature);
}
}
if (withSnapshot) {
dir.addSnapshotFeature(null);
@ -802,8 +806,8 @@ public class FSImageFormat {
if (LayoutVersion.supports(Feature.EXTENDED_ACL, imgVersion)) {
AclFsImageProto p = AclFsImageProto
.parseDelimitedFrom((DataInputStream) in);
aclFeature = new AclFeature();
aclFeature.setEntries(PBHelper.convertAclEntry(p.getEntriesList()));
aclFeature = new AclFeature(PBHelper.convertAclEntry(
p.getEntriesList()));
}
return aclFeature;
}
@ -825,9 +829,10 @@ public class FSImageFormat {
final short replication = namesystem.getBlockManager().adjustReplication(
in.readShort());
final long preferredBlockSize = in.readLong();
return new INodeFileAttributes.SnapshotCopy(name, permissions, modificationTime,
accessTime, replication, preferredBlockSize);
AclFeature aclFeature = permissions.getPermission().getAclBit() ?
loadAclFeature(in, layoutVersion) : null;
return new INodeFileAttributes.SnapshotCopy(name, permissions, aclFeature,
modificationTime, accessTime, replication, preferredBlockSize);
}
public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
@ -845,11 +850,14 @@ public class FSImageFormat {
//read quotas
final long nsQuota = in.readLong();
final long dsQuota = in.readLong();
AclFeature aclFeature = permissions.getPermission().getAclBit() ?
loadAclFeature(in, layoutVersion) : null;
return nsQuota == -1L && dsQuota == -1L?
new INodeDirectoryAttributes.SnapshotCopy(name, permissions, modificationTime)
new INodeDirectoryAttributes.SnapshotCopy(name, permissions,
aclFeature, modificationTime)
: new INodeDirectoryAttributes.CopyWithQuota(name, permissions,
modificationTime, nsQuota, dsQuota);
aclFeature, modificationTime, nsQuota, dsQuota);
}
private void loadFilesUnderConstruction(DataInput in,

View File

@ -220,6 +220,7 @@ public class FSImageSerialization {
out.writeShort(file.getFileReplication());
out.writeLong(file.getPreferredBlockSize());
writeAclFeature(file, out);
}
private static void writeQuota(Quota.Counts quota, DataOutput out)
@ -267,6 +268,7 @@ public class FSImageSerialization {
writePermissionStatus(a, out);
out.writeLong(a.getModificationTime());
writeQuota(a.getQuotaCounts(), out);
writeAclFeature(a, out);
}
/**
@ -288,8 +290,9 @@ public class FSImageSerialization {
writePermissionStatus(node, out);
}
private static void writeAclFeature(INodeWithAdditionalFields node,
DataOutput out) throws IOException {
private static void writeAclFeature(INodeAttributes node, DataOutput out)
throws IOException {
if (node.getFsPermission().getAclBit()) {
AclFsImageProto.Builder b = AclFsImageProto.newBuilder();
OutputStream os = (OutputStream) out;
@ -299,6 +302,7 @@ public class FSImageSerialization {
b.build().writeDelimitedTo(os);
}
}
/** Serialize a {@link INodeReference} node */
private static void writeINodeReference(INodeReference ref, DataOutput out,

View File

@ -240,10 +240,8 @@ class FSPermissionChecker {
}
FsPermission mode = inode.getFsPermission(snapshotId);
if (mode.getAclBit()) {
// TODO: handling of INodeReference?
AclFeature aclFeature = inode instanceof INodeWithAdditionalFields ?
((INodeWithAdditionalFields)inode).getAclFeature() : null;
if (aclFeature != null) {
AclFeature aclFeature = inode.getAclFeature(snapshotId);
assert aclFeature != null;
List<AclEntry> featureEntries = aclFeature.getEntries();
// It's possible that the inode has a default ACL but no access ACL.
if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
@ -251,7 +249,6 @@ class FSPermissionChecker {
return;
}
}
}
checkFsPermission(inode, snapshotId, access, mode);
}

View File

@ -154,6 +154,31 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
return nodeToUpdate;
}
abstract AclFeature getAclFeature(int snapshotId);
@Override
public final AclFeature getAclFeature() {
return getAclFeature(Snapshot.CURRENT_STATE_ID);
}
abstract void addAclFeature(AclFeature aclFeature);
final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId)
throws QuotaExceededException {
final INode nodeToUpdate = recordModification(latestSnapshotId);
nodeToUpdate.addAclFeature(aclFeature);
return nodeToUpdate;
}
abstract void removeAclFeature();
final INode removeAclFeature(int latestSnapshotId)
throws QuotaExceededException {
final INode nodeToUpdate = recordModification(latestSnapshotId);
nodeToUpdate.removeAclFeature();
return nodeToUpdate;
}
/**
* @return if the given snapshot id is {@link Snapshot#CURRENT_STATE_ID},
* return this; otherwise return the corresponding snapshot inode.

View File

@ -48,6 +48,9 @@ public interface INodeAttributes {
/** @return the permission information as a long. */
public long getPermissionLong();
/** @return the ACL feature. */
public AclFeature getAclFeature();
/** @return the modification time. */
public long getModificationTime();
@ -58,13 +61,15 @@ public interface INodeAttributes {
public static abstract class SnapshotCopy implements INodeAttributes {
private final byte[] name;
private final long permission;
private final AclFeature aclFeature;
private final long modificationTime;
private final long accessTime;
SnapshotCopy(byte[] name, PermissionStatus permissions,
long modificationTime, long accessTime) {
AclFeature aclFeature, long modificationTime, long accessTime) {
this.name = name;
this.permission = PermissionStatusFormat.toLong(permissions);
this.aclFeature = aclFeature;
this.modificationTime = modificationTime;
this.accessTime = accessTime;
}
@ -72,6 +77,7 @@ public interface INodeAttributes {
SnapshotCopy(INode inode) {
this.name = inode.getLocalNameBytes();
this.permission = inode.getPermissionLong();
this.aclFeature = inode.getAclFeature();
this.modificationTime = inode.getModificationTime();
this.accessTime = inode.getAccessTime();
}
@ -108,6 +114,11 @@ public interface INodeAttributes {
return permission;
}
@Override
public AclFeature getAclFeature() {
return aclFeature;
}
@Override
public final long getModificationTime() {
return modificationTime;

View File

@ -77,8 +77,11 @@ public class INodeDirectory extends INodeWithAdditionalFields
* @param other The INodeDirectory to be copied
* @param adopt Indicate whether or not need to set the parent field of child
* INodes to the new node
* @param featuresToCopy any number of features to copy to the new node.
* The method will do a reference copy, not a deep copy.
*/
public INodeDirectory(INodeDirectory other, boolean adopt, boolean copyFeatures) {
public INodeDirectory(INodeDirectory other, boolean adopt,
Feature... featuresToCopy) {
super(other);
this.children = other.children;
if (adopt && this.children != null) {
@ -86,9 +89,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
child.setParent(this);
}
}
if (copyFeatures) {
this.features = other.features;
}
this.features = featuresToCopy;
}
/** @return true unconditionally. */
@ -231,7 +232,8 @@ public class INodeDirectory extends INodeWithAdditionalFields
public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
Preconditions.checkState(getClass() != INodeDirectory.class,
"the class is already INodeDirectory, this=%s", this);
return replaceSelf(new INodeDirectory(this, true, true), inodeMap);
return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
inodeMap);
}
/** Replace itself with the given directory. */

View File

@ -35,8 +35,8 @@ public interface INodeDirectoryAttributes extends INodeAttributes {
public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
implements INodeDirectoryAttributes {
public SnapshotCopy(byte[] name, PermissionStatus permissions,
long modificationTime) {
super(name, permissions, modificationTime, 0L);
AclFeature aclFeature, long modificationTime) {
super(name, permissions, aclFeature, modificationTime, 0L);
}
public SnapshotCopy(INodeDirectory dir) {
@ -62,8 +62,9 @@ public interface INodeDirectoryAttributes extends INodeAttributes {
public CopyWithQuota(byte[] name, PermissionStatus permissions,
long modificationTime, long nsQuota, long dsQuota) {
super(name, permissions, modificationTime);
AclFeature aclFeature, long modificationTime, long nsQuota,
long dsQuota) {
super(name, permissions, aclFeature, modificationTime);
this.nsQuota = nsQuota;
this.dsQuota = dsQuota;
}

View File

@ -41,9 +41,9 @@ public interface INodeFileAttributes extends INodeAttributes {
private final long header;
public SnapshotCopy(byte[] name, PermissionStatus permissions,
long modificationTime, long accessTime,
AclFeature aclFeature, long modificationTime, long accessTime,
short replication, long preferredBlockSize) {
super(name, permissions, modificationTime, accessTime);
super(name, permissions, aclFeature, modificationTime, accessTime);
final long h = HeaderFormat.combineReplication(0L, replication);
header = HeaderFormat.combinePreferredBlockSize(h, preferredBlockSize);

View File

@ -213,6 +213,22 @@ public abstract class INodeReference extends INode {
public final FsPermission getFsPermission(int snapshotId) {
return referred.getFsPermission(snapshotId);
}
@Override
final AclFeature getAclFeature(int snapshotId) {
return referred.getAclFeature(snapshotId);
}
@Override
final void addAclFeature(AclFeature aclFeature) {
referred.addAclFeature(aclFeature);
}
@Override
final void removeAclFeature() {
referred.removeAclFeature();
}
@Override
public final short getFsPermissionShort() {
return referred.getFsPermissionShort();

View File

@ -220,6 +220,15 @@ public abstract class INodeWithAdditionalFields extends INode
return permission;
}
@Override
final AclFeature getAclFeature(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
return getSnapshotINode(snapshotId).getAclFeature();
}
return getFeature(AclFeature.class);
}
@Override
final long getModificationTime(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
@ -318,10 +327,6 @@ public abstract class INodeWithAdditionalFields extends INode
return null;
}
public AclFeature getAclFeature() {
return getFeature(AclFeature.class);
}
public void removeAclFeature() {
AclFeature f = getAclFeature();
Preconditions.checkNotNull(f);
@ -335,4 +340,8 @@ public abstract class INodeWithAdditionalFields extends INode
addFeature(f);
}
public final Feature[] getFeatures() {
return features;
}
}

View File

@ -184,7 +184,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
private int snapshotQuota = SNAPSHOT_LIMIT;
public INodeDirectorySnapshottable(INodeDirectory dir) {
super(dir, true, true);
super(dir, true, dir.getFeatures());
// add snapshot feature if the original directory does not have it
if (!isWithSnapshot()) {
addSnapshotFeature(null);

View File

@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
@ -28,12 +29,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/** Snapshot of a sub-tree in the namesystem. */
@InterfaceAudience.Private
public class Snapshot implements Comparable<byte[]> {
@ -139,7 +144,10 @@ public class Snapshot implements Comparable<byte[]> {
/** The root directory of the snapshot. */
static public class Root extends INodeDirectory {
Root(INodeDirectory other) {
super(other, false, false);
// Always preserve ACL.
super(other, false, Lists.newArrayList(
Iterables.filter(Arrays.asList(other.getFeatures()), AclFeature.class))
.toArray(new Feature[0]));
}
@Override

View File

@ -94,6 +94,13 @@ public class TestFSImageWithAcl {
s = cluster.getNamesystem().getAclStatus(p.toString());
returned = Lists.newArrayList(s.getEntries()).toArray(new AclEntry[0]);
Assert.assertArrayEquals(new AclEntry[] { }, returned);
fs.modifyAclEntries(p, Lists.newArrayList(e));
s = cluster.getNamesystem().getAclStatus(p.toString());
returned = Lists.newArrayList(s.getEntries()).toArray(new AclEntry[0]);
Assert.assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "foo", READ_EXECUTE),
aclEntry(ACCESS, GROUP, READ) }, returned);
}
@Test

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.io.IOUtils;
import org.junit.AfterClass;
import org.junit.Before;
@ -797,8 +798,7 @@ public class TestNameNodeAcl {
INode inode = cluster.getNamesystem().getFSDirectory().getRoot()
.getNode(path.toUri().getPath(), false);
assertNotNull(inode);
assertTrue(inode instanceof INodeWithAdditionalFields);
AclFeature aclFeature = ((INodeWithAdditionalFields)inode).getAclFeature();
AclFeature aclFeature = inode.getAclFeature(Snapshot.CURRENT_STATE_ID);
if (expectAclFeature) {
assertNotNull(aclFeature);
} else {

View File

@ -0,0 +1,776 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
import static org.apache.hadoop.fs.permission.AclEntryScope.*;
import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*;
import static org.junit.Assert.*;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.google.common.collect.Lists;
/**
* Tests interaction of ACLs with snapshots.
*/
public class TestAclWithSnapshot {
private static final UserGroupInformation BRUCE =
UserGroupInformation.createUserForTesting("bruce", new String[] { });
private static final UserGroupInformation DIANA =
UserGroupInformation.createUserForTesting("diana", new String[] { });
private static MiniDFSCluster cluster;
private static Configuration conf;
private static FileSystem fsAsBruce, fsAsDiana;
private static DistributedFileSystem hdfs;
private static int pathCount = 0;
private static Path path, snapshotPath;
private static String snapshotName;
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws Exception {
conf = new Configuration();
initCluster(true);
}
@AfterClass
public static void shutdown() throws Exception {
IOUtils.cleanup(null, hdfs, fsAsBruce, fsAsDiana);
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void setUp() {
++pathCount;
path = new Path("/p" + pathCount);
snapshotName = "snapshot" + pathCount;
snapshotPath = new Path(path, new Path(".snapshot", snapshotName));
}
@Test
public void testOriginalAclEnforcedForSnapshotRootAfterChange()
throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, ALL),
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(path, aclSpec);
assertDirPermissionGranted(fsAsBruce, BRUCE, path);
assertDirPermissionDenied(fsAsDiana, DIANA, path);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
// Both original and snapshot still have same ACL.
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, path);
s = hdfs.getAclStatus(snapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, snapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, READ_EXECUTE),
aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(path, aclSpec);
// Original has changed, but snapshot still has old ACL.
doSnapshotRootChangeAssertions(path, snapshotPath);
restart(false);
doSnapshotRootChangeAssertions(path, snapshotPath);
restart(true);
doSnapshotRootChangeAssertions(path, snapshotPath);
}
private static void doSnapshotRootChangeAssertions(Path path,
Path snapshotPath) throws Exception {
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02550, path);
s = hdfs.getAclStatus(snapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, snapshotPath);
assertDirPermissionDenied(fsAsBruce, BRUCE, path);
assertDirPermissionGranted(fsAsDiana, DIANA, path);
assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
}
@Test
public void testOriginalAclEnforcedForSnapshotContentsAfterChange()
throws Exception {
Path filePath = new Path(path, "file1");
Path subdirPath = new Path(path, "subdir1");
Path fileSnapshotPath = new Path(snapshotPath, "file1");
Path subdirSnapshotPath = new Path(snapshotPath, "subdir1");
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0777));
FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
.close();
FileSystem.mkdirs(hdfs, subdirPath, FsPermission.createImmutable(
(short)0700));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, READ_EXECUTE),
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(filePath, aclSpec);
hdfs.setAcl(subdirPath, aclSpec);
assertFilePermissionGranted(fsAsBruce, BRUCE, filePath);
assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
// Both original and snapshot still have same ACL.
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) };
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, filePath);
s = hdfs.getAclStatus(subdirPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirPath);
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, fileSnapshotPath);
assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
s = hdfs.getAclStatus(subdirSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirSnapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, READ_EXECUTE),
aclEntry(ACCESS, USER, "diana", ALL),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(filePath, aclSpec);
hdfs.setAcl(subdirPath, aclSpec);
// Original has changed, but snapshot still has old ACL.
doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
restart(false);
doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
restart(true);
doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
}
private static void doSnapshotContentsChangeAssertions(Path filePath,
Path fileSnapshotPath, Path subdirPath, Path subdirSnapshotPath)
throws Exception {
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "diana", ALL),
aclEntry(ACCESS, GROUP, NONE) };
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02570, filePath);
assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
assertFilePermissionGranted(fsAsDiana, DIANA, filePath);
s = hdfs.getAclStatus(subdirPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02570, subdirPath);
assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
assertDirPermissionGranted(fsAsDiana, DIANA, subdirPath);
expected = new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) };
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, fileSnapshotPath);
assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
s = hdfs.getAclStatus(subdirSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirSnapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
}
@Test
public void testOriginalAclEnforcedForSnapshotRootAfterRemoval()
throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, ALL),
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(path, aclSpec);
assertDirPermissionGranted(fsAsBruce, BRUCE, path);
assertDirPermissionDenied(fsAsDiana, DIANA, path);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
// Both original and snapshot still have same ACL.
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, path);
s = hdfs.getAclStatus(snapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, snapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
hdfs.removeAcl(path);
// Original has changed, but snapshot still has old ACL.
doSnapshotRootRemovalAssertions(path, snapshotPath);
restart(false);
doSnapshotRootRemovalAssertions(path, snapshotPath);
restart(true);
doSnapshotRootRemovalAssertions(path, snapshotPath);
}
private static void doSnapshotRootRemovalAssertions(Path path,
Path snapshotPath) throws Exception {
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission((short)0700, path);
s = hdfs.getAclStatus(snapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02750, snapshotPath);
assertDirPermissionDenied(fsAsBruce, BRUCE, path);
assertDirPermissionDenied(fsAsDiana, DIANA, path);
assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
}
@Test
public void testOriginalAclEnforcedForSnapshotContentsAfterRemoval()
throws Exception {
Path filePath = new Path(path, "file1");
Path subdirPath = new Path(path, "subdir1");
Path fileSnapshotPath = new Path(snapshotPath, "file1");
Path subdirSnapshotPath = new Path(snapshotPath, "subdir1");
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0777));
FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
.close();
FileSystem.mkdirs(hdfs, subdirPath, FsPermission.createImmutable(
(short)0700));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, READ_EXECUTE),
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE),
aclEntry(ACCESS, OTHER, NONE));
hdfs.setAcl(filePath, aclSpec);
hdfs.setAcl(subdirPath, aclSpec);
assertFilePermissionGranted(fsAsBruce, BRUCE, filePath);
assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
// Both original and snapshot still have same ACL.
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) };
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, filePath);
s = hdfs.getAclStatus(subdirPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirPath);
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, fileSnapshotPath);
assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
s = hdfs.getAclStatus(subdirSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirSnapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
hdfs.removeAcl(filePath);
hdfs.removeAcl(subdirPath);
// Original has changed, but snapshot still has old ACL.
doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
restart(false);
doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
restart(true);
doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
subdirSnapshotPath);
}
private static void doSnapshotContentsRemovalAssertions(Path filePath,
Path fileSnapshotPath, Path subdirPath, Path subdirSnapshotPath)
throws Exception {
AclEntry[] expected = new AclEntry[] { };
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)0500, filePath);
assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
s = hdfs.getAclStatus(subdirPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)0500, subdirPath);
assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
expected = new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) };
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, fileSnapshotPath);
assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
s = hdfs.getAclStatus(subdirSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02550, subdirSnapshotPath);
assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
}
@Test
public void testModifyReadsCurrentState() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", ALL));
hdfs.modifyAclEntries(path, aclSpec);
aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "diana", READ_EXECUTE));
hdfs.modifyAclEntries(path, aclSpec);
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", ALL),
aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
aclEntry(ACCESS, GROUP, NONE) };
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)02770, path);
assertDirPermissionGranted(fsAsBruce, BRUCE, path);
assertDirPermissionGranted(fsAsDiana, DIANA, path);
}
@Test
public void testRemoveReadsCurrentState() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", ALL));
hdfs.modifyAclEntries(path, aclSpec);
hdfs.removeAcl(path);
AclEntry[] expected = new AclEntry[] { };
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission((short)0700, path);
assertDirPermissionDenied(fsAsBruce, BRUCE, path);
assertDirPermissionDenied(fsAsDiana, DIANA, path);
}
@Test
public void testDefaultAclNotCopiedToAccessAclOfNewSnapshot()
throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE));
hdfs.modifyAclEntries(path, aclSpec);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
AclStatus s = hdfs.getAclStatus(path);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE),
aclEntry(DEFAULT, GROUP, NONE),
aclEntry(DEFAULT, MASK, READ_EXECUTE),
aclEntry(DEFAULT, OTHER, NONE) }, returned);
assertPermission((short)02700, path);
s = hdfs.getAclStatus(snapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE),
aclEntry(DEFAULT, GROUP, NONE),
aclEntry(DEFAULT, MASK, READ_EXECUTE),
aclEntry(DEFAULT, OTHER, NONE) }, returned);
assertPermission((short)02700, snapshotPath);
assertDirPermissionDenied(fsAsBruce, BRUCE, snapshotPath);
}
@Test
public void testModifyAclEntriesSnapshotPath() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE));
exception.expect(SnapshotAccessControlException.class);
hdfs.modifyAclEntries(snapshotPath, aclSpec);
}
@Test
public void testRemoveAclEntriesSnapshotPath() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "bruce"));
exception.expect(SnapshotAccessControlException.class);
hdfs.removeAclEntries(snapshotPath, aclSpec);
}
@Test
public void testRemoveDefaultAclSnapshotPath() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
exception.expect(SnapshotAccessControlException.class);
hdfs.removeDefaultAcl(snapshotPath);
}
@Test
public void testRemoveAclSnapshotPath() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
exception.expect(SnapshotAccessControlException.class);
hdfs.removeAcl(snapshotPath);
}
@Test
public void testSetAclSnapshotPath() throws Exception {
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "bruce"));
exception.expect(SnapshotAccessControlException.class);
hdfs.setAcl(snapshotPath, aclSpec);
}
@Test
public void testChangeAclExceedsQuota() throws Exception {
Path filePath = new Path(path, "file1");
Path fileSnapshotPath = new Path(snapshotPath, "file1");
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0755));
hdfs.allowSnapshot(path);
hdfs.setQuota(path, 3, HdfsConstants.QUOTA_DONT_SET);
FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
.close();
hdfs.setPermission(filePath, FsPermission.createImmutable((short)0600));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", READ_WRITE));
hdfs.modifyAclEntries(filePath, aclSpec);
hdfs.createSnapshot(path, snapshotName);
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_WRITE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02660, filePath);
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_WRITE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02660, filePath);
aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", READ));
exception.expect(NSQuotaExceededException.class);
hdfs.modifyAclEntries(filePath, aclSpec);
}
@Test
public void testRemoveAclExceedsQuota() throws Exception {
Path filePath = new Path(path, "file1");
Path fileSnapshotPath = new Path(snapshotPath, "file1");
FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0755));
hdfs.allowSnapshot(path);
hdfs.setQuota(path, 3, HdfsConstants.QUOTA_DONT_SET);
FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
.close();
hdfs.setPermission(filePath, FsPermission.createImmutable((short)0600));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", READ_WRITE));
hdfs.modifyAclEntries(filePath, aclSpec);
hdfs.createSnapshot(path, snapshotName);
AclStatus s = hdfs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_WRITE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02660, filePath);
s = hdfs.getAclStatus(fileSnapshotPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "bruce", READ_WRITE),
aclEntry(ACCESS, GROUP, NONE) }, returned);
assertPermission((short)02660, filePath);
aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "bruce", READ));
exception.expect(NSQuotaExceededException.class);
hdfs.removeAcl(filePath);
}
@Test
public void testGetAclStatusDotSnapshotPath() throws Exception {
hdfs.mkdirs(path);
SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
AclStatus s = hdfs.getAclStatus(new Path(path, ".snapshot"));
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] { }, returned);
}
/**
* Asserts that permission is denied to the given fs/user for the given
* directory.
*
* @param fs FileSystem to check
* @param user UserGroupInformation owner of fs
* @param pathToCheck Path directory to check
* @throws Exception if there is an unexpected error
*/
private static void assertDirPermissionDenied(FileSystem fs,
UserGroupInformation user, Path pathToCheck) throws Exception {
try {
fs.listStatus(pathToCheck);
fail("expected AccessControlException for user " + user + ", path = " +
pathToCheck);
} catch (AccessControlException e) {
// expected
}
}
/**
* Asserts that permission is granted to the given fs/user for the given
* directory.
*
* @param fs FileSystem to check
* @param user UserGroupInformation owner of fs
* @param pathToCheck Path directory to check
* @throws Exception if there is an unexpected error
*/
private static void assertDirPermissionGranted(FileSystem fs,
UserGroupInformation user, Path pathToCheck) throws Exception {
try {
fs.listStatus(pathToCheck);
} catch (AccessControlException e) {
fail("expected permission granted for user " + user + ", path = " +
pathToCheck);
}
}
/**
* Asserts that permission is denied to the given fs/user for the given file.
*
* @param fs FileSystem to check
* @param user UserGroupInformation owner of fs
* @param pathToCheck Path file to check
* @throws Exception if there is an unexpected error
*/
private static void assertFilePermissionDenied(FileSystem fs,
UserGroupInformation user, Path pathToCheck) throws Exception {
try {
fs.open(pathToCheck).close();
fail("expected AccessControlException for user " + user + ", path = " +
pathToCheck);
} catch (AccessControlException e) {
// expected
}
}
/**
* Asserts that permission is granted to the given fs/user for the given file.
*
* @param fs FileSystem to check
* @param user UserGroupInformation owner of fs
* @param pathToCheck Path file to check
* @throws Exception if there is an unexpected error
*/
private static void assertFilePermissionGranted(FileSystem fs,
UserGroupInformation user, Path pathToCheck) throws Exception {
try {
fs.open(pathToCheck).close();
} catch (AccessControlException e) {
fail("expected permission granted for user " + user + ", path = " +
pathToCheck);
}
}
/**
* Asserts the value of the FsPermission bits on the inode of the test path.
*
* @param perm short expected permission bits
* @param pathToCheck Path to check
* @throws Exception thrown if there is an unexpected error
*/
private static void assertPermission(short perm, Path pathToCheck)
throws Exception {
assertEquals(FsPermission.createImmutable(perm),
hdfs.getFileStatus(pathToCheck).getPermission());
}
/**
* Initialize the cluster, wait for it to become active, and get FileSystem
* instances for our test users.
*
* @param format if true, format the NameNode and DataNodes before starting up
* @throws Exception if any step fails
*/
private static void initCluster(boolean format) throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
.build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
fsAsBruce = DFSTestUtil.getFileSystemAs(BRUCE, conf);
fsAsDiana = DFSTestUtil.getFileSystemAs(DIANA, conf);
}
/**
* Restart the cluster, optionally saving a new checkpoint.
*
* @param checkpoint boolean true to save a new checkpoint
* @throws Exception if restart fails
*/
private static void restart(boolean checkpoint) throws Exception {
NameNode nameNode = cluster.getNameNode();
if (checkpoint) {
NameNodeAdapter.enterSafeMode(nameNode, false);
NameNodeAdapter.saveNamespace(nameNode);
}
shutdown();
initCluster(false);
}
}

View File

@ -305,7 +305,8 @@ public class TestDiff {
final int i = Diff.search(current, inode.getKey());
Assert.assertTrue(i >= 0);
final INodeDirectory oldinode = (INodeDirectory)current.get(i);
final INodeDirectory newinode = new INodeDirectory(oldinode, false, true);
final INodeDirectory newinode = new INodeDirectory(oldinode, false,
oldinode.getFeatures());
newinode.setModificationTime(oldinode.getModificationTime() + 1);
current.set(i, newinode);