HDFS-5616. NameNode: implement default ACL handling. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1565845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-07 23:11:27 +00:00
parent 0abf8e58b7
commit c89c516b95
11 changed files with 719 additions and 82 deletions

View File

@ -62,6 +62,8 @@ HDFS-4685 (Unreleased)
HDFS-5861. Add CLI test for Ls output for extended ACL marker. HDFS-5861. Add CLI test for Ls output for extended ACL marker.
(Vinay via cnauroth) (Vinay via cnauroth)
HDFS-5616. NameNode: implement default ACL handling. (cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -59,6 +59,89 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@InterfaceAudience.Private @InterfaceAudience.Private
final class AclStorage { final class AclStorage {
/**
* If a default ACL is defined on a parent directory, then copies that default
* ACL to a newly created child file or directory.
*
* @param child INode newly created child
*/
public static void copyINodeDefaultAcl(INode child) {
INodeDirectory parent = child.getParent();
if (!parent.getFsPermission().getAclBit()) {
return;
}
// The default ACL is applicable to new child files and directories only.
if (!child.isFile() && !child.isDirectory()) {
return;
}
// Split parent's entries into access vs. default.
List<AclEntry> featureEntries = parent.getAclFeature().getEntries();
ScopedAclEntries scopedEntries = new ScopedAclEntries(featureEntries);
List<AclEntry> parentDefaultEntries = scopedEntries.getDefaultEntries();
// The parent may have an access ACL but no default ACL. If so, exit.
if (parentDefaultEntries.isEmpty()) {
return;
}
// Pre-allocate list size for access entries to copy from parent.
List<AclEntry> accessEntries = Lists.newArrayListWithCapacity(
parentDefaultEntries.size());
FsPermission childPerm = child.getFsPermission();
// Copy each default ACL entry from parent to new child's access ACL.
boolean parentDefaultIsMinimal = isMinimalAcl(parentDefaultEntries);
for (AclEntry entry: parentDefaultEntries) {
AclEntryType type = entry.getType();
String name = entry.getName();
AclEntry.Builder builder = new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(type)
.setName(name);
// The child's initial permission bits are treated as the mode parameter,
// which can filter copied permission values for owner, mask and other.
final FsAction permission;
if (type == AclEntryType.USER && name == null) {
permission = entry.getPermission().and(childPerm.getUserAction());
} else if (type == AclEntryType.GROUP && parentDefaultIsMinimal) {
// This only happens if the default ACL is a minimal ACL: exactly 3
// entries corresponding to owner, group and other. In this case,
// filter the group permissions.
permission = entry.getPermission().and(childPerm.getGroupAction());
} else if (type == AclEntryType.MASK) {
// Group bits from mode parameter filter permission of mask entry.
permission = entry.getPermission().and(childPerm.getGroupAction());
} else if (type == AclEntryType.OTHER) {
permission = entry.getPermission().and(childPerm.getOtherAction());
} else {
permission = entry.getPermission();
}
builder.setPermission(permission);
accessEntries.add(builder.build());
}
// A new directory also receives a copy of the parent's default ACL.
List<AclEntry> defaultEntries = child.isDirectory() ? parentDefaultEntries :
Collections.<AclEntry>emptyList();
final FsPermission newPerm;
if (!isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) {
// Save the new ACL to the child.
child.addAclFeature(createAclFeature(accessEntries, defaultEntries));
newPerm = createFsPermissionForExtendedAcl(accessEntries, childPerm);
} else {
// The child is receiving a minimal ACL.
newPerm = createFsPermissionForMinimalAcl(accessEntries, childPerm);
}
child.setPermission(newPerm);
}
/** /**
* Reads the existing extended ACL entries of an inode. This method returns * Reads the existing extended ACL entries of an inode. This method returns
* only the extended ACL entries stored in the AclFeature. If the inode does * only the extended ACL entries stored in the AclFeature. If the inode does
@ -201,7 +284,7 @@ final class AclStorage {
assert newAcl.size() >= 3; assert newAcl.size() >= 3;
FsPermission perm = inode.getFsPermission(); FsPermission perm = inode.getFsPermission();
final FsPermission newPerm; final FsPermission newPerm;
if (newAcl.size() > 3) { if (!isMinimalAcl(newAcl)) {
// This is an extended ACL. Split entries into access vs. default. // This is an extended ACL. Split entries into access vs. default.
ScopedAclEntries scoped = new ScopedAclEntries(newAcl); ScopedAclEntries scoped = new ScopedAclEntries(newAcl);
List<AclEntry> accessEntries = scoped.getAccessEntries(); List<AclEntry> accessEntries = scoped.getAccessEntries();
@ -213,49 +296,19 @@ final class AclStorage {
"Invalid ACL: only directories may have a default ACL."); "Invalid ACL: only directories may have a default ACL.");
} }
// Pre-allocate list size for the explicit entries stored in the feature,
// which is all entries minus the 3 entries implicitly stored in the
// permission bits.
List<AclEntry> featureEntries = Lists.newArrayListWithCapacity(
(accessEntries.size() - 3) + defaultEntries.size());
// Calculate new permission bits. For a correctly sorted ACL, the first
// entry is the owner and the last 2 entries are the mask and other entries
// respectively. Also preserve sticky bit and toggle ACL bit on.
newPerm = new FsPermission(accessEntries.get(0).getPermission(),
accessEntries.get(accessEntries.size() - 2).getPermission(),
accessEntries.get(accessEntries.size() - 1).getPermission(),
perm.getStickyBit(), true);
// For the access ACL, the feature only needs to hold the named user and
// group entries. For a correctly sorted ACL, these will be in a
// predictable range.
if (accessEntries.size() > 3) {
featureEntries.addAll(
accessEntries.subList(1, accessEntries.size() - 2));
}
// Add all default entries to the feature.
featureEntries.addAll(defaultEntries);
// Attach entries to the feature. // Attach entries to the feature.
if (perm.getAclBit()) { if (perm.getAclBit()) {
inode.removeAclFeature(snapshotId); inode.removeAclFeature(snapshotId);
} }
inode.addAclFeature(new AclFeature(featureEntries), snapshotId); inode.addAclFeature(createAclFeature(accessEntries, defaultEntries),
snapshotId);
newPerm = createFsPermissionForExtendedAcl(accessEntries, perm);
} else { } else {
// This is a minimal ACL. Remove the ACL feature if it previously had one. // This is a minimal ACL. Remove the ACL feature if it previously had one.
if (perm.getAclBit()) { if (perm.getAclBit()) {
inode.removeAclFeature(snapshotId); inode.removeAclFeature(snapshotId);
} }
newPerm = createFsPermissionForMinimalAcl(newAcl, perm);
// Calculate new permission bits. For a correctly sorted ACL, the owner,
// group and other permissions are in order. Also preserve sticky bit and
// toggle ACL bit off.
newPerm = new FsPermission(newAcl.get(0).getPermission(),
newAcl.get(1).getPermission(),
newAcl.get(2).getPermission(),
perm.getStickyBit(), false);
} }
inode.setPermission(newPerm, snapshotId); inode.setPermission(newPerm, snapshotId);
@ -267,6 +320,70 @@ final class AclStorage {
private AclStorage() { private AclStorage() {
} }
/**
* Creates an AclFeature from the given ACL entries.
*
* @param accessEntries List<AclEntry> access ACL entries
* @param defaultEntries List<AclEntry> default ACL entries
* @return AclFeature containing the required ACL entries
*/
private static AclFeature createAclFeature(List<AclEntry> accessEntries,
List<AclEntry> defaultEntries) {
// Pre-allocate list size for the explicit entries stored in the feature,
// which is all entries minus the 3 entries implicitly stored in the
// permission bits.
List<AclEntry> featureEntries = Lists.newArrayListWithCapacity(
(accessEntries.size() - 3) + defaultEntries.size());
// For the access ACL, the feature only needs to hold the named user and
// group entries. For a correctly sorted ACL, these will be in a
// predictable range.
if (!isMinimalAcl(accessEntries)) {
featureEntries.addAll(
accessEntries.subList(1, accessEntries.size() - 2));
}
// Add all default entries to the feature.
featureEntries.addAll(defaultEntries);
return new AclFeature(Collections.unmodifiableList(featureEntries));
}
/**
* Creates the new FsPermission for an inode that is receiving an extended
* ACL, based on its access ACL entries. For a correctly sorted ACL, the
* first entry is the owner and the last 2 entries are the mask and other
* entries respectively. Also preserve sticky bit and toggle ACL bit on.
*
* @param accessEntries List<AclEntry> access ACL entries
* @param existingPerm FsPermission existing permissions
* @return FsPermission new permissions
*/
private static FsPermission createFsPermissionForExtendedAcl(
List<AclEntry> accessEntries, FsPermission existingPerm) {
return new FsPermission(accessEntries.get(0).getPermission(),
accessEntries.get(accessEntries.size() - 2).getPermission(),
accessEntries.get(accessEntries.size() - 1).getPermission(),
existingPerm.getStickyBit(), true);
}
/**
* Creates the new FsPermission for an inode that is receiving a minimal ACL,
* based on its access ACL entries. For a correctly sorted ACL, the owner,
* group and other permissions are in order. Also preserve sticky bit and
* toggle ACL bit off.
*
* @param accessEntries List<AclEntry> access ACL entries
* @param existingPerm FsPermission existing permissions
* @return FsPermission new permissions
*/
private static FsPermission createFsPermissionForMinimalAcl(
List<AclEntry> accessEntries, FsPermission existingPerm) {
return new FsPermission(accessEntries.get(0).getPermission(),
accessEntries.get(1).getPermission(),
accessEntries.get(2).getPermission(),
existingPerm.getStickyBit(), false);
}
/** /**
* Translates the given permission bits to the equivalent minimal ACL. * Translates the given permission bits to the equivalent minimal ACL.
* *
@ -292,4 +409,15 @@ final class AclStorage {
.setPermission(perm.getOtherAction()) .setPermission(perm.getOtherAction())
.build()); .build());
} }
/**
* Checks if the given entries represent a minimal ACL (contains exactly 3
* entries).
*
* @param entries List<AclEntry> entries to check
* @return boolean true if the entries represent a minimal ACL
*/
private static boolean isMinimalAcl(List<AclEntry> entries) {
return entries.size() == 3;
}
} }

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -285,7 +286,7 @@ public class FSDirectory implements Closeable {
short replication, long preferredBlockSize, String clientName, short replication, long preferredBlockSize, String clientName,
String clientMachine, DatanodeDescriptor clientNode) String clientMachine, DatanodeDescriptor clientNode)
throws FileAlreadyExistsException, QuotaExceededException, throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException { UnresolvedLinkException, SnapshotAccessControlException, AclException {
waitForReady(); waitForReady();
// Always do an implicit mkdirs for parent directory tree. // Always do an implicit mkdirs for parent directory tree.
@ -327,6 +328,7 @@ public class FSDirectory implements Closeable {
INodeFile unprotectedAddFile( long id, INodeFile unprotectedAddFile( long id,
String path, String path,
PermissionStatus permissions, PermissionStatus permissions,
List<AclEntry> aclEntries,
short replication, short replication,
long modificationTime, long modificationTime,
long atime, long atime,
@ -349,6 +351,10 @@ public class FSDirectory implements Closeable {
try { try {
if (addINode(path, newNode)) { if (addINode(path, newNode)) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries,
Snapshot.CURRENT_STATE_ID);
}
return newNode; return newNode;
} }
} catch (IOException e) { } catch (IOException e) {
@ -1927,7 +1933,8 @@ public class FSDirectory implements Closeable {
boolean mkdirs(String src, PermissionStatus permissions, boolean mkdirs(String src, PermissionStatus permissions,
boolean inheritPermission, long now) boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException, throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException { UnresolvedLinkException, SnapshotAccessControlException,
AclException {
src = normalizePath(src); src = normalizePath(src);
String[] names = INode.getPathNames(src); String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names); byte[][] components = INode.getPathComponents(names);
@ -1990,7 +1997,7 @@ public class FSDirectory implements Closeable {
pathbuilder.append(Path.SEPARATOR + names[i]); pathbuilder.append(Path.SEPARATOR + names[i]);
unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i, unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
components[i], (i < lastInodeIndex) ? parentPermissions components[i], (i < lastInodeIndex) ? parentPermissions
: permissions, now); : permissions, null, now);
if (inodes[i] == null) { if (inodes[i] == null) {
return false; return false;
} }
@ -2013,14 +2020,14 @@ public class FSDirectory implements Closeable {
} }
INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions, INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
long timestamp) throws QuotaExceededException, List<AclEntry> aclEntries, long timestamp)
UnresolvedLinkException { throws QuotaExceededException, UnresolvedLinkException, AclException {
assert hasWriteLock(); assert hasWriteLock();
byte[][] components = INode.getPathComponents(src); byte[][] components = INode.getPathComponents(src);
INodesInPath iip = getExistingPathINodes(components); INodesInPath iip = getExistingPathINodes(components);
INode[] inodes = iip.getINodes(); INode[] inodes = iip.getINodes();
final int pos = inodes.length - 1; final int pos = inodes.length - 1;
unprotectedMkdir(inodeId, iip, pos, components[pos], permissions, unprotectedMkdir(inodeId, iip, pos, components[pos], permissions, aclEntries,
timestamp); timestamp);
return inodes[pos]; return inodes[pos];
} }
@ -2030,12 +2037,16 @@ public class FSDirectory implements Closeable {
* All ancestors exist. Newly created one stored at index pos. * All ancestors exist. Newly created one stored at index pos.
*/ */
private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath, private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
int pos, byte[] name, PermissionStatus permission, long timestamp) int pos, byte[] name, PermissionStatus permission,
throws QuotaExceededException { List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, AclException {
assert hasWriteLock(); assert hasWriteLock();
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission, final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
timestamp); timestamp);
if (addChild(inodesInPath, pos, dir, true)) { if (addChild(inodesInPath, pos, dir, true)) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
}
inodesInPath.setINode(pos, dir); inodesInPath.setINode(pos, dir);
} }
} }
@ -2260,6 +2271,7 @@ public class FSDirectory implements Closeable {
-counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE)); -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
} else { } else {
iip.setINode(pos - 1, child.getParent()); iip.setINode(pos - 1, child.getParent());
AclStorage.copyINodeDefaultAcl(child);
addToInodeMap(child); addToInodeMap(child);
} }
return added; return added;
@ -2645,7 +2657,7 @@ public class FSDirectory implements Closeable {
INodeSymlink addSymlink(String path, String target, INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException, throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException { QuotaExceededException, SnapshotAccessControlException, AclException {
waitForReady(); waitForReady();
final long modTime = now(); final long modTime = now();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@ -685,6 +686,7 @@ public class FSEditLog implements LogsPurgeable {
*/ */
public void logOpenFile(String path, INodeFile newNode, boolean toLogRpcIds) { public void logOpenFile(String path, INodeFile newNode, boolean toLogRpcIds) {
Preconditions.checkArgument(newNode.isUnderConstruction()); Preconditions.checkArgument(newNode.isUnderConstruction());
PermissionStatus permissions = newNode.getPermissionStatus();
AddOp op = AddOp.getInstance(cache.get()) AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId()) .setInodeId(newNode.getId())
.setPath(path) .setPath(path)
@ -693,7 +695,9 @@ public class FSEditLog implements LogsPurgeable {
.setAccessTime(newNode.getAccessTime()) .setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks()) .setBlocks(newNode.getBlocks())
.setPermissionStatus(newNode.getPermissionStatus()) .setPermissionStatus(permissions)
.setAclEntries(permissions.getPermission().getAclBit() ?
AclStorage.readINodeLogicalAcl(newNode) : null)
.setClientName(newNode.getFileUnderConstructionFeature().getClientName()) .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine()); .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
logRpcIds(op, toLogRpcIds); logRpcIds(op, toLogRpcIds);
@ -740,11 +744,14 @@ public class FSEditLog implements LogsPurgeable {
* Add create directory record to edit log * Add create directory record to edit log
*/ */
public void logMkDir(String path, INode newNode) { public void logMkDir(String path, INode newNode) {
PermissionStatus permissions = newNode.getPermissionStatus();
MkdirOp op = MkdirOp.getInstance(cache.get()) MkdirOp op = MkdirOp.getInstance(cache.get())
.setInodeId(newNode.getId()) .setInodeId(newNode.getId())
.setPath(path) .setPath(path)
.setTimestamp(newNode.getModificationTime()) .setTimestamp(newNode.getModificationTime())
.setPermissionStatus(newNode.getPermissionStatus()); .setPermissionStatus(permissions)
.setAclEntries(permissions.getPermission().getAclBit() ?
AclStorage.readINodeLogicalAcl(newNode) : null);
logEdit(op); logEdit(op);
} }

View File

@ -320,9 +320,10 @@ public class FSEditLogLoader {
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
lastInodeId); lastInodeId);
newFile = fsDir.unprotectedAddFile(inodeId, newFile = fsDir.unprotectedAddFile(inodeId,
addCloseOp.path, addCloseOp.permissions, replication, addCloseOp.path, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true, replication, addCloseOp.mtime, addCloseOp.atime,
addCloseOp.clientName, addCloseOp.clientMachine); addCloseOp.blockSize, true, addCloseOp.clientName,
addCloseOp.clientMachine);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path); fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
// add the op into retry cache if necessary // add the op into retry cache if necessary
@ -468,7 +469,7 @@ public class FSEditLogLoader {
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion, inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId); lastInodeId);
fsDir.unprotectedMkdir(inodeId, mkdirOp.path, mkdirOp.permissions, fsDir.unprotectedMkdir(inodeId, mkdirOp.path, mkdirOp.permissions,
mkdirOp.timestamp); mkdirOp.aclEntries, mkdirOp.timestamp);
break; break;
} }
case OP_SET_GENSTAMP_V1: { case OP_SET_GENSTAMP_V1: {

View File

@ -93,6 +93,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclFsImageProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils; import org.apache.hadoop.hdfs.util.XMLUtils;
@ -308,6 +309,7 @@ public abstract class FSEditLogOp {
long blockSize; long blockSize;
Block[] blocks; Block[] blocks;
PermissionStatus permissions; PermissionStatus permissions;
List<AclEntry> aclEntries;
String clientName; String clientName;
String clientMachine; String clientMachine;
@ -370,6 +372,11 @@ public abstract class FSEditLogOp {
return (T)this; return (T)this;
} }
<T extends AddCloseOp> T setAclEntries(List<AclEntry> aclEntries) {
this.aclEntries = aclEntries;
return (T)this;
}
<T extends AddCloseOp> T setClientName(String clientName) { <T extends AddCloseOp> T setClientName(String clientName) {
this.clientName = clientName; this.clientName = clientName;
return (T)this; return (T)this;
@ -392,6 +399,13 @@ public abstract class FSEditLogOp {
permissions.write(out); permissions.write(out);
if (this.opCode == OP_ADD) { if (this.opCode == OP_ADD) {
if (permissions.getPermission().getAclBit()) {
AclFsImageProto.newBuilder()
.addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
.build()
.writeDelimitedTo(out);
}
FSImageSerialization.writeString(clientName,out); FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out); FSImageSerialization.writeString(clientMachine,out);
// write clientId and callId // write clientId and callId
@ -450,6 +464,14 @@ public abstract class FSEditLogOp {
// clientname, clientMachine and block locations of last block. // clientname, clientMachine and block locations of last block.
if (this.opCode == OP_ADD) { if (this.opCode == OP_ADD) {
if (permissions.getPermission().getAclBit()) {
aclEntries = PBHelper.convertAclEntry(
AclFsImageProto.parseDelimitedFrom((DataInputStream)in)
.getEntriesList());
} else {
aclEntries = null;
}
this.clientName = FSImageSerialization.readString(in); this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in); this.clientMachine = FSImageSerialization.readString(in);
// read clientId and callId // read clientId and callId
@ -501,6 +523,8 @@ public abstract class FSEditLogOp {
builder.append(Arrays.toString(blocks)); builder.append(Arrays.toString(blocks));
builder.append(", permissions="); builder.append(", permissions=");
builder.append(permissions); builder.append(permissions);
builder.append(", aclEntries=");
builder.append(aclEntries);
builder.append(", clientName="); builder.append(", clientName=");
builder.append(clientName); builder.append(clientName);
builder.append(", clientMachine="); builder.append(", clientMachine=");
@ -538,6 +562,9 @@ public abstract class FSEditLogOp {
} }
FSEditLogOp.permissionStatusToXml(contentHandler, permissions); FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
if (this.opCode == OP_ADD) { if (this.opCode == OP_ADD) {
if (permissions.getPermission().getAclBit()) {
appendAclEntriesToXml(contentHandler, aclEntries);
}
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
} }
} }
@ -563,6 +590,11 @@ public abstract class FSEditLogOp {
this.blocks = new Block[0]; this.blocks = new Block[0];
} }
this.permissions = permissionStatusFromXml(st); this.permissions = permissionStatusFromXml(st);
if (permissions.getPermission().getAclBit()) {
aclEntries = readAclEntriesFromXml(st);
} else {
aclEntries = null;
}
readRpcIdsFromXml(st); readRpcIdsFromXml(st);
} }
} }
@ -1229,6 +1261,7 @@ public abstract class FSEditLogOp {
String path; String path;
long timestamp; long timestamp;
PermissionStatus permissions; PermissionStatus permissions;
List<AclEntry> aclEntries;
private MkdirOp() { private MkdirOp() {
super(OP_MKDIR); super(OP_MKDIR);
@ -1258,6 +1291,11 @@ public abstract class FSEditLogOp {
return this; return this;
} }
MkdirOp setAclEntries(List<AclEntry> aclEntries) {
this.aclEntries = aclEntries;
return this;
}
@Override @Override
public public
void writeFields(DataOutputStream out) throws IOException { void writeFields(DataOutputStream out) throws IOException {
@ -1266,6 +1304,12 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeLong(timestamp, out); // mtime FSImageSerialization.writeLong(timestamp, out); // mtime
FSImageSerialization.writeLong(timestamp, out); // atime, unused at this FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
permissions.write(out); permissions.write(out);
if (permissions.getPermission().getAclBit()) {
AclFsImageProto.newBuilder()
.addAllEntries(PBHelper.convertAclEntryProto(aclEntries))
.build()
.writeDelimitedTo(out);
}
} }
@Override @Override
@ -1303,6 +1347,13 @@ public abstract class FSEditLogOp {
} }
this.permissions = PermissionStatus.read(in); this.permissions = PermissionStatus.read(in);
if (permissions.getPermission().getAclBit()) {
aclEntries = PBHelper.convertAclEntry(
AclFsImageProto.parseDelimitedFrom((DataInputStream)in)
.getEntriesList());
} else {
aclEntries = null;
}
} }
@Override @Override
@ -1318,6 +1369,8 @@ public abstract class FSEditLogOp {
builder.append(timestamp); builder.append(timestamp);
builder.append(", permissions="); builder.append(", permissions=");
builder.append(permissions); builder.append(permissions);
builder.append(", aclEntries=");
builder.append(aclEntries);
builder.append(", opCode="); builder.append(", opCode=");
builder.append(opCode); builder.append(opCode);
builder.append(", txid="); builder.append(", txid=");
@ -1336,6 +1389,9 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "TIMESTAMP", XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString()); Long.valueOf(timestamp).toString());
FSEditLogOp.permissionStatusToXml(contentHandler, permissions); FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
if (permissions.getPermission().getAclBit()) {
appendAclEntriesToXml(contentHandler, aclEntries);
}
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
@ -1344,6 +1400,11 @@ public abstract class FSEditLogOp {
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP")); this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
this.permissions = permissionStatusFromXml(st); this.permissions = permissionStatusFromXml(st);
if (permissions.getPermission().getAclBit()) {
aclEntries = readAclEntriesFromXml(st);
} else {
aclEntries = null;
}
} }
} }
@ -3387,31 +3448,13 @@ public abstract class FSEditLogOp {
@Override @Override
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src); XMLUtils.addSaxString(contentHandler, "SRC", src);
for (AclEntry e : aclEntries) { appendAclEntriesToXml(contentHandler, aclEntries);
contentHandler.startElement("", "", "ENTRY", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "SCOPE", e.getScope().name());
XMLUtils.addSaxString(contentHandler, "TYPE", e.getType().name());
XMLUtils.addSaxString(contentHandler, "NAME", e.getName());
fsActionToXml(contentHandler, e.getPermission());
contentHandler.endElement("", "", "ENTRY");
}
} }
@Override @Override
void fromXml(Stanza st) throws InvalidXmlException { void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC"); src = st.getValue("SRC");
if (!st.hasChildren("ENTRY")) aclEntries = readAclEntriesFromXml(st);
return;
List<Stanza> stanzas = st.getChildren("ENTRY");
for (Stanza s : stanzas) {
AclEntry e = new AclEntry.Builder()
.setScope(AclEntryScope.valueOf(s.getValue("SCOPE")))
.setType(AclEntryType.valueOf(s.getValue("TYPE")))
.setName(s.getValue("NAME"))
.setPermission(fsActionFromXml(s)).build();
aclEntries.add(e);
}
} }
} }
@ -3836,4 +3879,33 @@ public abstract class FSEditLogOp {
throw new InvalidXmlException("Invalid value for FsAction"); throw new InvalidXmlException("Invalid value for FsAction");
return v; return v;
} }
private static void appendAclEntriesToXml(ContentHandler contentHandler,
List<AclEntry> aclEntries) throws SAXException {
for (AclEntry e : aclEntries) {
contentHandler.startElement("", "", "ENTRY", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "SCOPE", e.getScope().name());
XMLUtils.addSaxString(contentHandler, "TYPE", e.getType().name());
XMLUtils.addSaxString(contentHandler, "NAME", e.getName());
fsActionToXml(contentHandler, e.getPermission());
contentHandler.endElement("", "", "ENTRY");
}
}
private static List<AclEntry> readAclEntriesFromXml(Stanza st) {
List<AclEntry> aclEntries = Lists.newArrayList();
if (!st.hasChildren("ENTRY"))
return aclEntries;
List<Stanza> stanzas = st.getChildren("ENTRY");
for (Stanza s : stanzas) {
AclEntry e = new AclEntry.Builder()
.setScope(AclEntryScope.valueOf(s.getValue("SCOPE")))
.setType(AclEntryType.valueOf(s.getValue("TYPE")))
.setName(s.getValue("NAME"))
.setPermission(fsActionFromXml(s)).build();
aclEntries.add(e);
}
return aclEntries;
}
} }

View File

@ -49,7 +49,7 @@ final class ScopedAclEntries {
Collections.<AclEntry>emptyList(); Collections.<AclEntry>emptyList();
defaultEntries = aclEntries.subList(pivot, aclEntries.size()); defaultEntries = aclEntries.subList(pivot, aclEntries.size());
} else { } else {
accessEntries = !aclEntries.isEmpty() ? aclEntries : null; accessEntries = aclEntries;
defaultEntries = Collections.emptyList(); defaultEntries = Collections.emptyList();
} }
} }

View File

@ -27,6 +27,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
@ -786,6 +787,255 @@ public abstract class FSAclBaseTest {
assertAclFeature(true); assertAclFeature(true);
} }
@Test
public void testDefaultAclNewFile() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path filePath = new Path(path, "file1");
fs.create(filePath).close();
AclStatus s = fs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
assertPermission(filePath, (short)02640);
assertAclFeature(filePath, true);
}
@Test
public void testOnlyAccessAclNewFile() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "foo", ALL));
fs.modifyAclEntries(path, aclSpec);
Path filePath = new Path(path, "file1");
fs.create(filePath).close();
AclStatus s = fs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission(filePath, (short)0644);
assertAclFeature(filePath, false);
}
@Test
public void testDefaultMinimalAclNewFile() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, OTHER, NONE));
fs.setAcl(path, aclSpec);
Path filePath = new Path(path, "file1");
fs.create(filePath).close();
AclStatus s = fs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission(filePath, (short)0640);
assertAclFeature(filePath, false);
}
@Test
public void testDefaultAclNewDir() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
fs.mkdirs(dirPath);
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, NONE) }, returned);
assertPermission(dirPath, (short)02750);
assertAclFeature(dirPath, true);
}
@Test
public void testOnlyAccessAclNewDir() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, "foo", ALL));
fs.modifyAclEntries(path, aclSpec);
Path dirPath = new Path(path, "dir1");
fs.mkdirs(dirPath);
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission(dirPath, (short)0755);
assertAclFeature(dirPath, false);
}
@Test
public void testDefaultMinimalAclNewDir() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, OTHER, NONE));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
fs.mkdirs(dirPath);
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, OTHER, NONE) }, returned);
assertPermission(dirPath, (short)02750);
assertAclFeature(dirPath, true);
}
@Test
public void testDefaultAclNewFileIntermediate() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
Path filePath = new Path(dirPath, "file1");
fs.create(filePath).close();
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, NONE) };
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(dirPath, (short)02750);
assertAclFeature(dirPath, true);
expected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE) };
s = fs.getAclStatus(filePath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(filePath, (short)02640);
assertAclFeature(filePath, true);
}
@Test
public void testDefaultAclNewDirIntermediate() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
Path subdirPath = new Path(dirPath, "subdir1");
fs.mkdirs(subdirPath);
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, NONE) };
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(dirPath, (short)02750);
assertAclFeature(dirPath, true);
s = fs.getAclStatus(subdirPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(subdirPath, (short)02750);
assertAclFeature(subdirPath, true);
}
@Test
public void testDefaultAclNewSymlinkIntermediate() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
Path filePath = new Path(path, "file1");
fs.create(filePath).close();
fs.setPermission(filePath, FsPermission.createImmutable((short)0640));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
Path linkPath = new Path(dirPath, "link1");
fs.createSymlink(filePath, linkPath, true);
AclEntry[] expected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, NONE) };
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(dirPath, (short)02750);
assertAclFeature(dirPath, true);
expected = new AclEntry[] { };
s = fs.getAclStatus(linkPath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(linkPath, (short)0640);
assertAclFeature(linkPath, false);
s = fs.getAclStatus(filePath);
returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(expected, returned);
assertPermission(filePath, (short)0640);
assertAclFeature(filePath, false);
}
@Test
public void testDefaultAclNewFileWithMode() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0755));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path filePath = new Path(path, "file1");
int bufferSize = cluster.getConfiguration(0).getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
fs.create(filePath, new FsPermission((short)0740), false, bufferSize,
fs.getDefaultReplication(filePath), fs.getDefaultBlockSize(path), null)
.close();
AclStatus s = fs.getAclStatus(filePath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
assertPermission(filePath, (short)02740);
assertAclFeature(filePath, true);
}
@Test
public void testDefaultAclNewDirWithMode() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0755));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(path, aclSpec);
Path dirPath = new Path(path, "dir1");
fs.mkdirs(dirPath, new FsPermission((short)0740));
AclStatus s = fs.getAclStatus(dirPath);
AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, READ_EXECUTE) }, returned);
assertPermission(dirPath, (short)02740);
assertAclFeature(dirPath, true);
}
/** /**
* Asserts whether or not the inode for the test path has an AclFeature. * Asserts whether or not the inode for the test path has an AclFeature.
* *
@ -795,8 +1045,21 @@ public abstract class FSAclBaseTest {
*/ */
private static void assertAclFeature(boolean expectAclFeature) private static void assertAclFeature(boolean expectAclFeature)
throws IOException { throws IOException {
assertAclFeature(path, expectAclFeature);
}
/**
* Asserts whether or not the inode for a specific path has an AclFeature.
*
* @param pathToCheck Path inode to check
* @param expectAclFeature boolean true if an AclFeature must be present,
* false if an AclFeature must not be present
* @throws IOException thrown if there is an I/O error
*/
private static void assertAclFeature(Path pathToCheck,
boolean expectAclFeature) throws IOException {
INode inode = cluster.getNamesystem().getFSDirectory().getRoot() INode inode = cluster.getNamesystem().getFSDirectory().getRoot()
.getNode(path.toUri().getPath(), false); .getNode(pathToCheck.toUri().getPath(), false);
assertNotNull(inode); assertNotNull(inode);
AclFeature aclFeature = inode.getAclFeature(); AclFeature aclFeature = inode.getAclFeature();
if (expectAclFeature) { if (expectAclFeature) {
@ -813,7 +1076,19 @@ public abstract class FSAclBaseTest {
* @throws IOException thrown if there is an I/O error * @throws IOException thrown if there is an I/O error
*/ */
private static void assertPermission(short perm) throws IOException { private static void assertPermission(short perm) throws IOException {
assertPermission(path, perm);
}
/**
* Asserts the value of the FsPermission bits on the inode of a specific path.
*
* @param pathToCheck Path inode to check
* @param perm short expected permission bits
* @throws IOException thrown if there is an I/O error
*/
private static void assertPermission(Path pathToCheck, short perm)
throws IOException {
assertEquals(FsPermission.createImmutable(perm), assertEquals(FsPermission.createImmutable(perm),
fs.getFileStatus(path).getPermission()); fs.getFileStatus(pathToCheck).getPermission());
} }
} }

View File

@ -23,11 +23,13 @@ import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*; import static org.apache.hadoop.fs.permission.FsAction.*;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -64,14 +66,7 @@ public class TestFSImageWithAcl {
.setPermission(READ_EXECUTE).setScope(ACCESS).setType(USER).build(); .setPermission(READ_EXECUTE).setScope(ACCESS).setType(USER).build();
fs.modifyAclEntries(p, Lists.newArrayList(e)); fs.modifyAclEntries(p, Lists.newArrayList(e));
if (persistNamespace) { restart(fs, persistNamespace);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
}
cluster.restartNameNode();
cluster.waitActive();
AclStatus s = cluster.getNamesystem().getAclStatus(p.toString()); AclStatus s = cluster.getNamesystem().getAclStatus(p.toString());
AclEntry[] returned = Lists.newArrayList(s.getEntries()).toArray( AclEntry[] returned = Lists.newArrayList(s.getEntries()).toArray(
@ -112,4 +107,133 @@ public class TestFSImageWithAcl {
public void testAclEditLog() throws IOException { public void testAclEditLog() throws IOException {
testAcl(false); testAcl(false);
} }
private void doTestDefaultAclNewChildren(boolean persistNamespace)
throws IOException {
Path dirPath = new Path("/dir");
Path filePath = new Path(dirPath, "file1");
Path subdirPath = new Path(dirPath, "subdir1");
DistributedFileSystem fs = cluster.getFileSystem();
fs.mkdirs(dirPath);
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
fs.setAcl(dirPath, aclSpec);
fs.create(filePath).close();
fs.mkdirs(subdirPath);
AclEntry[] fileExpected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE) };
AclEntry[] subdirExpected = new AclEntry[] {
aclEntry(ACCESS, USER, "foo", ALL),
aclEntry(ACCESS, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "foo", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, READ_EXECUTE) };
AclEntry[] fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
AclEntry[] subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
restart(fs, persistNamespace);
fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "foo", READ_WRITE));
fs.modifyAclEntries(dirPath, aclSpec);
fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
restart(fs, persistNamespace);
fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
fs.removeAcl(dirPath);
fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
restart(fs, persistNamespace);
fileReturned = fs.getAclStatus(filePath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(fileExpected, fileReturned);
subdirReturned = fs.getAclStatus(subdirPath).getEntries()
.toArray(new AclEntry[0]);
Assert.assertArrayEquals(subdirExpected, subdirReturned);
assertPermission(fs, subdirPath, (short)02755);
}
@Test
public void testFsImageDefaultAclNewChildren() throws IOException {
doTestDefaultAclNewChildren(true);
}
@Test
public void testEditLogDefaultAclNewChildren() throws IOException {
doTestDefaultAclNewChildren(false);
}
/**
* Asserts the value of the FsPermission bits on the inode of a specific path.
*
* @param fs DistributedFileSystem to use for check
* @param pathToCheck Path inode to check
* @param perm short expected permission bits
* @throws IOException thrown if there is an I/O error
*/
private static void assertPermission(DistributedFileSystem fs,
Path pathToCheck, short perm) throws IOException {
Assert.assertEquals(FsPermission.createImmutable(perm),
fs.getFileStatus(pathToCheck).getPermission());
}
/**
* Restart the NameNode, optionally saving a new checkpoint.
*
* @param fs DistributedFileSystem used for saving namespace
* @param persistNamespace boolean true to save a new checkpoint
* @throws IOException if restart fails
*/
private void restart(DistributedFileSystem fs, boolean persistNamespace)
throws IOException {
if (persistNamespace) {
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
}
cluster.restartNameNode();
cluster.waitActive();
}
} }

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/** /**
* Tests ACL APIs via WebHDFS. * Tests ACL APIs via WebHDFS.
@ -37,4 +39,14 @@ public class TestWebHDFSAcl extends FSAclBaseTest {
fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME); fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
assertTrue(fs instanceof WebHdfsFileSystem); assertTrue(fs instanceof WebHdfsFileSystem);
} }
/**
* We need to skip this test on WebHDFS, because WebHDFS currently cannot
* resolve symlinks.
*/
@Override
@Test
@Ignore
public void testDefaultAclNewSymlinkIntermediate() {
}
} }

View File

@ -593,12 +593,16 @@
</comparator> </comparator>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>group::r--</expected-output> <expected-output>group::r-x</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>group:admin:rwx</expected-output> <expected-output>group:admin:rwx</expected-output>
</comparator> </comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>mask::r--</expected-output>
</comparator>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>other::r--</expected-output> <expected-output>other::r--</expected-output>