HBASE-14425 In Secure Zookeeper cluster superuser will not have sufficient permission if multiple values are configured in hbase.superuser (Pankaj Kumar)

This commit is contained in:
Enis Soztutar 2015-10-27 16:56:20 -07:00
parent 007e4dfa13
commit 0e6dd3257b
4 changed files with 85 additions and 7 deletions

View File

@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@ -1028,11 +1030,23 @@ public class ZKUtil {
return Ids.OPEN_ACL_UNSAFE;
}
if (isSecureZooKeeper) {
String superUser = zkw.getConfiguration().get("hbase.superuser");
ArrayList<ACL> acls = new ArrayList<ACL>();
// add permission to hbase supper user
if (superUser != null) {
acls.add(new ACL(Perms.ALL, new Id("auth", superUser)));
String[] superUsers = zkw.getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
if (superUsers != null) {
List<String> groups = new ArrayList<String>();
for (String user : superUsers) {
if (user.startsWith(AuthUtil.GROUP_PREFIX)) {
// TODO: Set node ACL for groups when ZK supports this feature
groups.add(user);
} else {
acls.add(new ACL(Perms.ALL, new Id("auth", user)));
}
}
if (!groups.isEmpty()) {
LOG.warn("Znode ACL setting for group " + groups
+ " is skipped, Zookeeper doesn't support this feature presently.");
}
}
// Certain znodes are accessed directly by the client,
// so they must be readable by non-authenticated clients

View File

@ -31,10 +31,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@ -257,7 +259,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
* @throws IOException
*/
private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
String superUser = conf.get("hbase.superuser");
String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
// Check whether ACL set for all superusers
if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
return false;
}
// this assumes that current authenticated user is the same as zookeeper client user
// configured via JAAS
@ -276,7 +282,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
if (perms != Perms.READ) {
return false;
}
} else if (superUser != null && new Id("sasl", superUser).equals(id)) {
} else if (superUsers != null && isSuperUserId(superUsers, id)) {
if (perms != Perms.ALL) {
return false;
}
@ -290,6 +296,41 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
}
return true;
}
/*
* Validate whether ACL set for all superusers.
*/
private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
for (String user : superUsers) {
boolean hasAccess = false;
// TODO: Validate super group members also when ZK supports setting node ACL for groups.
if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
for (ACL acl : acls) {
if (user.equals(acl.getId().getId()) && acl.getPerms() == Perms.ALL) {
hasAccess = true;
break;
}
}
if (!hasAccess) {
return false;
}
}
}
return true;
}
/*
* Validate whether ACL ID is superuser.
*/
public static boolean isSuperUserId(String[] superUsers, Id id) {
for (String user : superUsers) {
// TODO: Validate super group members also when ZK supports setting node ACL for groups.
if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
return true;
}
}
return false;
}
@Override
public String toString() {

View File

@ -18,11 +18,18 @@
*/
package org.apache.hadoop.hbase.zookeeper;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -43,4 +50,19 @@ public class TestZKUtil {
Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
}
@Test
public void testCreateACL() throws ZooKeeperConnectionException, IOException {
Configuration conf = HBaseConfiguration.create();
conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");
String node = "/hbase/testCreateACL";
ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, node, null, false);
List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
Assert.assertEquals(aclList.size(), 4); // 3+1, since ACL will be set for the creator by default
Assert.assertTrue(!aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group1")))
&& !aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group2"))));
Assert.assertTrue(aclList.contains(new ACL(Perms.ALL, new Id("auth", "user1")))
&& aclList.contains(new ACL(Perms.ALL, new Id("auth", "user2")))
&& aclList.contains(new ACL(Perms.ALL, new Id("auth", "user3"))));
}
}

View File

@ -178,6 +178,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
boolean expectedWorldReadable) throws KeeperException, InterruptedException {
Stat stat = new Stat();
List<ACL> acls = zk.getZooKeeper().getACL(znode, stat);
String[] superUsers = superUser == null ? null : superUser.split(",");
LOG.info("Checking ACLs for znode znode:" + znode + " acls:" + acls);
@ -191,7 +192,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
assertTrue(expectedWorldReadable);
// assert that anyone can only read
assertEquals(perms, Perms.READ);
} else if (superUser != null && new Id("sasl", superUser).equals(id)) {
} else if (superUsers != null && ZooKeeperWatcher.isSuperUserId(superUsers, id)) {
// assert that super user has all the permissions
assertEquals(perms, Perms.ALL);
} else if (new Id("sasl", masterPrincipal).equals(id)) {