HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs present in fsimage or edits. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1567450 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-11 23:13:48 +00:00
parent ff0abf1929
commit 6d2a0aa1db
6 changed files with 26 additions and 57 deletions

View File

@ -72,6 +72,9 @@ HDFS-4685 (Unreleased)
HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth) HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth)
HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs
present in fsimage or edits. (cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,8 +24,7 @@ import org.apache.hadoop.hdfs.protocol.AclException;
/** /**
* Support for ACLs is controlled by a configuration flag. If the configuration * Support for ACLs is controlled by a configuration flag. If the configuration
* flag is false, then the NameNode will reject all ACL-related operations and * flag is false, then the NameNode will reject all ACL-related operations.
* refuse to load an fsimage or edit log containing ACLs.
*/ */
final class AclConfigFlag { final class AclConfigFlag {
private final boolean enabled; private final boolean enabled;
@ -47,28 +46,11 @@ final class AclConfigFlag {
* @throws AclException if ACLs are disabled * @throws AclException if ACLs are disabled
*/ */
public void checkForApiCall() throws AclException { public void checkForApiCall() throws AclException {
check("The ACL operation has been rejected.");
}
/**
* Checks the flag on behalf of edit log loading.
*
* @throws AclException if ACLs are disabled
*/
public void checkForEditLog() throws AclException {
check("Cannot load edit log containing an ACL.");
}
/**
* Common check method.
*
* @throws AclException if ACLs are disabled
*/
private void check(String reason) throws AclException {
if (!enabled) { if (!enabled) {
throw new AclException(String.format( throw new AclException(String.format(
"%s Support for ACLs has been disabled by setting %s to false.", "The ACL operation has been rejected. "
reason, DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY)); + "Support for ACLs has been disabled by setting %s to false.",
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
} }
} }
} }

View File

@ -294,9 +294,6 @@ public class FSEditLogLoader {
switch (op.opCode) { switch (op.opCode) {
case OP_ADD: { case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op; AddCloseOp addCloseOp = (AddCloseOp)op;
if (addCloseOp.aclEntries != null) {
fsNamesys.getAclConfigFlag().checkForEditLog();
}
final String path = final String path =
renameReservedPathsOnUpgrade(addCloseOp.path, logVersion); renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
if (FSNamesystem.LOG.isDebugEnabled()) { if (FSNamesystem.LOG.isDebugEnabled()) {
@ -485,9 +482,6 @@ public class FSEditLogLoader {
} }
case OP_MKDIR: { case OP_MKDIR: {
MkdirOp mkdirOp = (MkdirOp)op; MkdirOp mkdirOp = (MkdirOp)op;
if (mkdirOp.aclEntries != null) {
fsNamesys.getAclConfigFlag().checkForEditLog();
}
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion, inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId); lastInodeId);
fsDir.unprotectedMkdir(inodeId, fsDir.unprotectedMkdir(inodeId,
@ -749,7 +743,6 @@ public class FSEditLogLoader {
break; break;
} }
case OP_SET_ACL: { case OP_SET_ACL: {
fsNamesys.getAclConfigFlag().checkForEditLog();
SetAclOp setAclOp = (SetAclOp) op; SetAclOp setAclOp = (SetAclOp) op;
fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries); fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
break; break;

View File

@ -7393,10 +7393,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return results; return results;
} }
AclConfigFlag getAclConfigFlag() {
return aclConfigFlag;
}
void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException { void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
aclConfigFlag.checkForApiCall(); aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;

View File

@ -351,11 +351,7 @@
<description> <description>
Set to true to enable support for HDFS ACLs (Access Control Lists). By Set to true to enable support for HDFS ACLs (Access Control Lists). By
default, ACLs are disabled. When ACLs are disabled, the NameNode rejects default, ACLs are disabled. When ACLs are disabled, the NameNode rejects
all attempts to set an ACL. An fsimage containing an ACL will cause the all RPCs related to setting or getting ACLs.
NameNode to abort during startup, and ACLs present in the edit log will
cause the NameNode to abort. To transition from ACLs enabled to ACLs
disabled, restart the NameNode with ACLs enabled, remove all ACLs, save a
new checkpoint, and then restart the NameNode with ACLs disabled.
</description> </description>
</property> </property>

View File

@ -23,8 +23,6 @@ import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*; import static org.apache.hadoop.fs.permission.FsAction.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -34,7 +32,6 @@ import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -44,9 +41,8 @@ import com.google.common.collect.Lists;
/** /**
* Tests that the configuration flag that controls support for ACLs is off by * Tests that the configuration flag that controls support for ACLs is off by
* default and causes all attempted operations related to ACLs to fail. This * default and causes all attempted operations related to ACLs to fail. The
* includes the API calls, ACLs found while loading fsimage and ACLs found while * NameNode can still load ACLs from fsimage or edits.
* applying edit log ops.
*/ */
public class TestAclConfigFlag { public class TestAclConfigFlag {
private static final Path PATH = new Path("/path"); private static final Path PATH = new Path("/path");
@ -125,20 +121,23 @@ public class TestAclConfigFlag {
fs.setAcl(PATH, Lists.newArrayList( fs.setAcl(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE))); aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
// Attempt restart with ACLs disabled. // Restart with ACLs disabled. Expect successful restart.
try {
restart(false, false); restart(false, false);
fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, e);
} }
// Recover by restarting with ACLs enabled, deleting the ACL, saving a new @Test
// checkpoint, and then restarting with ACLs disabled. public void testFsImage() throws Exception {
restart(false, true); // With ACLs enabled, set an ACL.
fs.removeAcl(PATH); initCluster(true, true);
restart(true, false); fs.mkdirs(PATH);
fs.setAcl(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
// Save a new checkpoint and restart with ACLs still enabled.
restart(true, true);
// Restart with ACLs disabled. Expect successful restart.
restart(false, false);
} }
/** /**