HDFS-5899. Add configuration flag to disable/enable support for ACLs. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1566041 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-08 16:20:29 +00:00
parent c89c516b95
commit dd2eb97ddd
17 changed files with 357 additions and 1 deletions

View File

@ -64,6 +64,9 @@ HDFS-4685 (Unreleased)
HDFS-5616. NameNode: implement default ACL handling. (cnauroth)
HDFS-5899. Add configuration flag to disable/enable support for ACLs.
(cnauroth)
OPTIMIZATIONS
BUG FIXES

View File

@ -2725,6 +2725,7 @@ public class DFSClient implements java.io.Closeable {
return namenode.getAclStatus(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
}

View File

@ -179,6 +179,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY = "dfs.permissions.superusergroup";
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
public static final String DFS_NAMENODE_ACLS_ENABLED_KEY = "dfs.namenode.acls.enabled";
public static final boolean DFS_NAMENODE_ACLS_ENABLED_DEFAULT = false;
public static final String DFS_ADMIN = "dfs.cluster.administrators";
public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.https.server.keystore.resource";
public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml";

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.AclException;
/**
* Support for ACLs is controlled by a configuration flag. If the configuration
* flag is false, then the NameNode will reject all ACL-related operations and
* refuse to load an fsimage or edit log containing ACLs.
*/
final class AclConfigFlag {
private final boolean enabled;
/**
* Creates a new AclConfigFlag from configuration.
*
* @param conf Configuration to check
*/
public AclConfigFlag(Configuration conf) {
enabled = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
LogFactory.getLog(AclConfigFlag.class).info("ACLs enabled? " + enabled);
}
/**
* Checks the flag on behalf of an ACL API call.
*
* @throws AclException if ACLs are disabled
*/
public void checkForApiCall() throws AclException {
check("The ACL operation has been rejected.");
}
/**
* Checks the flag on behalf of edit log loading.
*
* @throws AclException if ACLs are disabled
*/
public void checkForEditLog() throws AclException {
check("Cannot load edit log containing an ACL.");
}
/**
* Checks the flag on behalf of fsimage loading.
*
* @throws AclException if ACLs are disabled
*/
public void checkForFsImage() throws AclException {
check("Cannot load fsimage containing an ACL.");
}
/**
* Common check method.
*
* @throws AclException if ACLs are disabled
*/
private void check(String reason) throws AclException {
if (!enabled) {
throw new AclException(String.format(
"%s Support for ACLs has been disabled by setting %s to false.",
reason, DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
}
}
}

View File

@ -293,6 +293,9 @@ public class FSEditLogLoader {
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
if (addCloseOp.aclEntries != null) {
fsNamesys.getAclConfigFlag().checkForEditLog();
}
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
@ -466,6 +469,9 @@ public class FSEditLogLoader {
}
case OP_MKDIR: {
MkdirOp mkdirOp = (MkdirOp)op;
if (mkdirOp.aclEntries != null) {
fsNamesys.getAclConfigFlag().checkForEditLog();
}
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
fsDir.unprotectedMkdir(inodeId, mkdirOp.path, mkdirOp.permissions,
@ -705,6 +711,7 @@ public class FSEditLogLoader {
break;
}
case OP_SET_ACL: {
fsNamesys.getAclConfigFlag().checkForEditLog();
SetAclOp setAclOp = (SetAclOp) op;
fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
break;

View File

@ -802,6 +802,7 @@ public class FSImageFormat {
private AclFeature loadAclFeature(DataInput in, final int imgVersion)
throws IOException {
namesystem.getAclConfigFlag().checkForFsImage();
AclFeature aclFeature = null;
if (LayoutVersion.supports(Feature.EXTENDED_ACL, imgVersion)) {
AclFsImageProto p = AclFsImageProto

View File

@ -503,7 +503,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private INodeId inodeId;
private final RetryCache retryCache;
private final AclConfigFlag aclConfigFlag;
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@ -774,6 +776,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
this.aclConfigFlag = new AclConfigFlag(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@ -7350,7 +7353,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return results;
}
AclConfigFlag getAclConfigFlag() {
return aclConfigFlag;
}
void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
@ -7371,6 +7379,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
@ -7391,6 +7400,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void removeDefaultAcl(String src) throws IOException {
aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
@ -7411,6 +7421,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void removeAcl(String src) throws IOException {
aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
@ -7431,6 +7442,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
aclConfigFlag.checkForApiCall();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
@ -7451,6 +7463,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
AclStatus getAclStatus(String src) throws IOException {
aclConfigFlag.checkForApiCall();
checkOperation(OperationCategory.READ);
readLock();
try {

View File

@ -345,6 +345,20 @@
</property>
-->
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
<description>
Set to true to enable support for HDFS ACLs (Access Control Lists). By
default, ACLs are disabled. When ACLs are disabled, the NameNode rejects
all attempts to set an ACL. An fsimage containing an ACL will cause the
NameNode to abort during startup, and ACLs present in the edit log will
cause the NameNode to abort. To transition from ACLs enabled to ACLs
disabled, restart the NameNode with ACLs enabled, remove all ACLs, save a
new checkpoint, and then restart the NameNode with ACLs disabled.
</description>
</property>
<property>
<name>dfs.block.access.token.enable</name>
<value>false</value>

View File

@ -36,6 +36,7 @@ public class TestAclCLI extends CLITestHelperDFS {
@Override
public void setUp() throws Exception {
super.setUp();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
fs = cluster.getFileSystem();
namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");

View File

@ -64,6 +64,7 @@ public class TestStickyBit {
public static void init() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
initCluster(true);
}

View File

@ -66,6 +66,7 @@ public class TestSafeMode {
public void startUp() throws IOException {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
import static org.apache.hadoop.fs.permission.AclEntryScope.*;
import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.google.common.collect.Lists;
/**
* Tests that the configuration flag that controls support for ACLs is off by
* default and causes all attempted operations related to ACLs to fail. This
* includes the API calls, ACLs found while loading fsimage and ACLs found while
* applying edit log ops.
*/
public class TestAclConfigFlag {
private static final Path PATH = new Path("/path");
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
@Rule
public ExpectedException exception = ExpectedException.none();
@After
public void shutdown() throws Exception {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testModifyAclEntries() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.modifyAclEntries(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
}
@Test
public void testRemoveAclEntries() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.removeAclEntries(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
}
@Test
public void testRemoveDefaultAcl() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.removeAclEntries(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
}
@Test
public void testRemoveAcl() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.removeAcl(PATH);
}
@Test
public void testSetAcl() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.setAcl(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
}
@Test
public void testGetAclStatus() throws Exception {
initCluster(true, false);
fs.mkdirs(PATH);
expectException();
fs.getAclStatus(PATH);
}
@Test
public void testEditLog() throws Exception {
// With ACLs enabled, set an ACL.
initCluster(true, true);
fs.mkdirs(PATH);
fs.setAcl(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
// Attempt restart with ACLs disabled.
try {
restart(false, false);
fail("expected IOException");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, e);
}
// Recover by restarting with ACLs enabled, deleting the ACL, saving a new
// checkpoint, and then restarting with ACLs disabled.
restart(false, true);
fs.removeAcl(PATH);
restart(true, false);
}
@Test
public void testFsImage() throws Exception {
// With ACLs enabled, set an ACL.
initCluster(true, true);
fs.mkdirs(PATH);
fs.setAcl(PATH, Lists.newArrayList(
aclEntry(DEFAULT, USER, "foo", READ_WRITE)));
// Save a new checkpoint and restart with ACLs still enabled.
restart(true, true);
// Attempt restart with ACLs disabled.
try {
restart(false, false);
fail("expected IOException");
} catch (IOException e) {
// Unfortunately, we can't assert on the message containing the
// configuration key here. That message is logged, but a more generic
// fsimage loading exception propagates up to this layer.
GenericTestUtils.assertExceptionContains(
"Failed to load an FSImage file", e);
}
// Recover by restarting with ACLs enabled, deleting the ACL, saving a new
// checkpoint, and then restarting with ACLs disabled.
restart(false, true);
fs.removeAcl(PATH);
restart(true, false);
}
/**
* We expect an AclException, and we want the exception text to state the
* configuration key that controls ACL support.
*/
private void expectException() {
exception.expect(AclException.class);
exception.expectMessage(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY);
}
/**
* Initialize the cluster, wait for it to become active, and get FileSystem.
*
* @param format if true, format the NameNode and DataNodes before starting up
* @param aclsEnabled if true, ACL support is enabled
* @throws Exception if any step fails
*/
private void initCluster(boolean format, boolean aclsEnabled)
throws Exception {
Configuration conf = new Configuration();
// not explicitly setting to false, should be false by default
if (aclsEnabled) {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
.build();
cluster.waitActive();
fs = cluster.getFileSystem();
}
/**
* Restart the cluster, optionally saving a new checkpoint.
*
* @param checkpoint boolean true to save a new checkpoint
* @param aclsEnabled if true, ACL support is enabled
* @throws Exception if restart fails
*/
private void restart(boolean checkpoint, boolean aclsEnabled)
throws Exception {
NameNode nameNode = cluster.getNameNode();
if (checkpoint) {
NameNodeAdapter.enterSafeMode(nameNode, false);
NameNodeAdapter.saveNamespace(nameNode);
}
shutdown();
initCluster(false, aclsEnabled);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -47,6 +48,7 @@ public class TestFSImageWithAcl {
@BeforeClass
public static void setUp() throws IOException {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.BeforeClass;
@ -33,6 +34,7 @@ public class TestNameNodeAcl extends FSAclBaseTest {
@BeforeClass
public static void init() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -75,6 +76,7 @@ public class TestAclWithSnapshot {
@BeforeClass
public static void init() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
initCluster(true);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.junit.BeforeClass;
@ -34,6 +35,7 @@ public class TestWebHDFSAcl extends FSAclBaseTest {
@BeforeClass
public static void init() throws Exception {
Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);

View File

@ -71,6 +71,7 @@ public class TestPermissionSymlinks {
@BeforeClass
public static void beforeClassSetUp() throws Exception {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.set(FsPermission.UMASK_LABEL, "000");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();