HDFS-7560. ACLs removed by removeDefaultAcl() will be back after NameNode restart/failover. Contributed by Vinayakumar B.

(cherry picked from commit 2cf90a2c33)
This commit is contained in:
cnauroth 2014-12-22 13:59:10 -08:00
parent 6f252f7f36
commit 5486124668
4 changed files with 44 additions and 13 deletions

View File

@ -370,6 +370,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7557. Fix spacing for a few keys in DFSConfigKeys.java
(Colin P.McCabe)
HDFS-7560. ACLs removed by removeDefaultAcl() will be back after NameNode
restart/failover. (Vinayakumar B via cnauroth)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -143,7 +143,7 @@ static HdfsFileStatus setAcl(
try {
iip = fsd.getINodesInPath4Write(src);
fsd.checkOwner(pc, iip);
List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec);
List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec, false);
fsd.getEditLog().logSetAcl(src, newAcl);
} finally {
fsd.writeUnlock();
@ -185,7 +185,7 @@ static AclStatus getAclStatus(
}
static List<AclEntry> unprotectedSetAcl(
FSDirectory fsd, String src, List<AclEntry> aclSpec)
FSDirectory fsd, String src, List<AclEntry> aclSpec, boolean fromEdits)
throws IOException {
assert fsd.hasWriteLock();
final INodesInPath iip = fsd.getINodesInPath4Write(
@ -199,9 +199,11 @@ static List<AclEntry> unprotectedSetAcl(
INode inode = FSDirectory.resolveLastINode(iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
aclSpec);
List<AclEntry> newAcl = aclSpec;
if (!fromEdits) {
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
newAcl = AclTransformation.replaceAclEntries(existingAcl, aclSpec);
}
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}

View File

@ -827,7 +827,8 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
}
case OP_SET_ACL: {
SetAclOp setAclOp = (SetAclOp) op;
FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries);
FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries,
true);
break;
}
case OP_SET_XATTR: {

View File

@ -426,7 +426,7 @@ public void testRemoveAclEntriesPathNotFound() throws IOException {
}
@Test
public void testRemoveDefaultAcl() throws IOException {
public void testRemoveDefaultAcl() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, ALL),
@ -443,10 +443,15 @@ public void testRemoveDefaultAcl() throws IOException {
aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
assertPermission((short)010770);
assertAclFeature(true);
// restart of the cluster
restartCluster();
s = fs.getAclStatus(path);
AclEntry[] afterRestart = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(returned, afterRestart);
}
@Test
public void testRemoveDefaultAclOnlyAccess() throws IOException {
public void testRemoveDefaultAclOnlyAccess() throws Exception {
fs.create(path).close();
fs.setPermission(path, FsPermission.createImmutable((short)0640));
List<AclEntry> aclSpec = Lists.newArrayList(
@ -463,10 +468,15 @@ public void testRemoveDefaultAclOnlyAccess() throws IOException {
aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
assertPermission((short)010770);
assertAclFeature(true);
// restart of the cluster
restartCluster();
s = fs.getAclStatus(path);
AclEntry[] afterRestart = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(returned, afterRestart);
}
@Test
public void testRemoveDefaultAclOnlyDefault() throws IOException {
public void testRemoveDefaultAclOnlyDefault() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", ALL));
@ -477,10 +487,15 @@ public void testRemoveDefaultAclOnlyDefault() throws IOException {
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission((short)0750);
assertAclFeature(false);
// restart of the cluster
restartCluster();
s = fs.getAclStatus(path);
AclEntry[] afterRestart = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(returned, afterRestart);
}
@Test
public void testRemoveDefaultAclMinimal() throws IOException {
public void testRemoveDefaultAclMinimal() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
fs.removeDefaultAcl(path);
AclStatus s = fs.getAclStatus(path);
@ -488,10 +503,15 @@ public void testRemoveDefaultAclMinimal() throws IOException {
assertArrayEquals(new AclEntry[] { }, returned);
assertPermission((short)0750);
assertAclFeature(false);
// restart of the cluster
restartCluster();
s = fs.getAclStatus(path);
AclEntry[] afterRestart = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(returned, afterRestart);
}
@Test
public void testRemoveDefaultAclStickyBit() throws IOException {
public void testRemoveDefaultAclStickyBit() throws Exception {
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)01750));
List<AclEntry> aclSpec = Lists.newArrayList(
aclEntry(ACCESS, USER, ALL),
@ -508,6 +528,11 @@ public void testRemoveDefaultAclStickyBit() throws IOException {
aclEntry(ACCESS, GROUP, READ_EXECUTE) }, returned);
assertPermission((short)011770);
assertAclFeature(true);
// restart of the cluster
restartCluster();
s = fs.getAclStatus(path);
AclEntry[] afterRestart = s.getEntries().toArray(new AclEntry[0]);
assertArrayEquals(returned, afterRestart);
}
@Test(expected=FileNotFoundException.class)
@ -1137,9 +1162,7 @@ public void testSkipAclEnforcementPermsDisabled() throws Exception {
assertFilePermissionDenied(fsAsDiana, DIANA, bruceFile);
try {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
destroyFileSystems();
restartCluster();
initFileSystems();
assertFilePermissionGranted(fsAsDiana, DIANA, bruceFile);
} finally {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
@ -1404,10 +1427,12 @@ private void initFileSystems() throws Exception {
* @throws Exception if restart fails
*/
private void restartCluster() throws Exception {
destroyFileSystems();
shutdown();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false)
.build();
cluster.waitActive();
initFileSystems();
}
/**