HBASE-15200 ZooKeeper znode ACL checks should only compare the shortname

This commit is contained in:
Andrew Purtell 2016-02-01 09:48:16 -08:00
parent 2f5767376f
commit 6256ce4e63
1 changed files with 52 additions and 4 deletions

View File

@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -126,6 +128,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
private final Exception constructorCaller;
/* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
/**
* Instantiate a ZooKeeper connection and watcher.
* @param identifier string that is passed to RecoverableZookeeper to be used as
@ -215,6 +220,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
*/
public void checkAndSetZNodeAcls() {
if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
LOG.info("not a secure deployment, proceeding");
return;
}
@ -259,6 +265,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
* @throws IOException
*/
private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking znode ACLs");
}
String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
// Check whether ACL set for all superusers
if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
@ -270,6 +279,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
if (acls.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("ACL is empty");
}
return false;
}
@ -280,17 +292,45 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// and one for the hbase user
if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
if (perms != Perms.READ) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("permissions for '%s' are not correct: have %0x, want %0x",
id, perms, Perms.READ));
}
return false;
}
} else if (superUsers != null && isSuperUserId(superUsers, id)) {
if (perms != Perms.ALL) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("permissions for '%s' are not correct: have %0x, want %0x",
id, perms, Perms.ALL));
}
return false;
}
} else if (new Id("sasl", hbaseUser).equals(id)) {
} else if ("sasl".equals(id.getScheme())) {
String name = id.getId();
// If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
Matcher match = NAME_PATTERN.matcher(name);
if (match.matches()) {
name = match.group(1);
}
if (name.equals(hbaseUser)) {
if (perms != Perms.ALL) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("permissions for '%s' are not correct: have %0x, want %0x",
id, perms, Perms.ALL));
}
return false;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected shortname in SASL ACL: " + id);
}
return false;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("unexpected ACL id '" + id + "'");
}
return false;
}
}
@ -306,8 +346,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// TODO: Validate super group members also when ZK supports setting node ACL for groups.
if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
for (ACL acl : acls) {
if (user.equals(acl.getId().getId()) && acl.getPerms() == Perms.ALL) {
if (user.equals(acl.getId().getId())) {
if (acl.getPerms() == Perms.ALL) {
hasAccess = true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"superuser '%s' does not have correct permissions: have %0x, want %0x",
acl.getId().getId(), acl.getPerms(), Perms.ALL));
}
}
break;
}
}