YARN-1528. Allow setting auth for ZK connections. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1573017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-02-28 17:36:04 +00:00
parent d00de6418a
commit 052e6e7fa6
6 changed files with 115 additions and 25 deletions

View File

@ -226,6 +226,8 @@ Release 2.4.0 - UNRELEASED
YARN-1301. Added the INFO level log of the non-empty blacklist additions
and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen)
YARN-1528. Allow setting auth for ZK connections. (kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -335,6 +335,8 @@ public class YarnConfiguration extends Configuration {
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
public static final String RM_ZK_AUTH = RM_ZK_PREFIX + "auth";
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk-state-store.";

View File

@ -31,14 +31,12 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@InterfaceAudience.Private
@ -88,18 +86,8 @@ public class EmbeddedElectorService extends AbstractService
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_RM_ZK_ACL);
List<ACL> zkAcls;
try {
zkAcls = ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
} catch (ZKUtil.BadAclFormatException bafe) {
throw new YarnRuntimeException(
YarnConfiguration.RM_ZK_ACL + "has ill-formatted ACLs");
}
// TODO (YARN-1528): ZKAuthInfo to be set for rm-store and elector
List<ZKUtil.ZKAuthInfo> zkAuths = Collections.emptyList();
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this);

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zookeeper.data.ACL;
import java.util.Collections;
import java.util.List;
/**
* Helper class that provides utility methods specific to ZK operations
*/
@InterfaceAudience.Private
public class RMZKUtils {
private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
/**
* Utility method to fetch the ZK ACLs from the configuration
*/
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_RM_ZK_ACL);
try {
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
return ZKUtil.parseACLs(zkAclConf);
} catch (Exception e) {
LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
throw e;
}
}
/**
* Utility method to fetch ZK auth info from the configuration
*/
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
throws Exception {
String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
try {
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
if (zkAuthConf != null) {
return ZKUtil.parseAuth(zkAuthConf);
} else {
return Collections.emptyList();
}
} catch (Exception e) {
LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
throw e;
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -91,6 +92,7 @@ public class ZKRMStateStore extends RMStateStore {
private int zkSessionTimeout;
private long zkRetryInterval;
private List<ACL> zkAcl;
private List<ZKUtil.ZKAuthInfo> zkAuths;
/**
*
@ -200,18 +202,9 @@ public class ZKRMStateStore extends RMStateStore {
zkRetryInterval =
conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_RM_ZK_ACL);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
try {
zkAcl = ZKUtil.parseACLs(zkAclConf);
} catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " + YarnConfiguration.RM_ZK_ACL);
throw bafe;
}
zkAcl = RMZKUtils.getZKAcls(conf);
zkAuths = RMZKUtils.getZKAuths(conf);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
@ -952,6 +945,9 @@ public class ZKRMStateStore extends RMStateStore {
retries++) {
try {
zkClient = getNewZooKeeper();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
}
if (useDefaultFencingScheme) {
zkClient.addAuthInfo(zkRootNodeAuthScheme,
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());

View File

@ -32,10 +32,12 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
@ -49,6 +51,20 @@ public class TestZKRMStateStoreZKClientConnections extends
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
private static final String DIGEST_USER_PASS="test-user:test-password";
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
private static final String DIGEST_USER_HASH;
static {
try {
DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
DIGEST_USER_PASS);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
class TestZKClient {
ZKRMStateStore store;
@ -252,4 +268,16 @@ public class TestZKRMStateStoreZKClientConnections extends
fail(error);
}
}
@Test
public void testZKAuths() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100);
conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL);
conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD);
zkClientTester.getRMStateStore(conf);
}
}