HADOOP-14741. Refactor curator based ZooKeeper communication into common library. (Íñigo Goiri via Subru).

This commit is contained in:
Subru Krishnan 2017-08-14 11:03:50 -07:00
parent 394573780b
commit a70efb6138
13 changed files with 566 additions and 202 deletions

View File

@ -364,4 +364,25 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
// HDFS client HTrace configuration.
public static final String FS_CLIENT_HTRACE_PREFIX = "fs.client.htrace.";
// Global ZooKeeper configuration keys
public static final String ZK_PREFIX = "hadoop.zk.";
/** ACL for the ZooKeeper ensemble. */
public static final String ZK_ACL = ZK_PREFIX + "acl";
public static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
/** Authentication for the ZooKeeper ensemble. */
public static final String ZK_AUTH = ZK_PREFIX + "auth";
/** Address of the ZooKeeper ensemble. */
public static final String ZK_ADDRESS = ZK_PREFIX + "address";
/** Maximum number of retries for a ZooKeeper operation. */
public static final String ZK_NUM_RETRIES = ZK_PREFIX + "num-retries";
public static final int ZK_NUM_RETRIES_DEFAULT = 1000;
/** Timeout for a ZooKeeper operation in ZooKeeper in milliseconds. */
public static final String ZK_TIMEOUT_MS = ZK_PREFIX + "timeout-ms";
public static final int ZK_TIMEOUT_MS_DEFAULT = 10000;
/** How often to retry a ZooKeeper operation in milliseconds. */
public static final String ZK_RETRY_INTERVAL_MS =
ZK_PREFIX + "retry-interval-ms";
public static final int ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
}

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.util.curator;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class that provides utility methods specific to ZK operations.
*/
@InterfaceAudience.Private
public final class ZKCuratorManager {
private static final Logger LOG =
LoggerFactory.getLogger(ZKCuratorManager.class);
/** Configuration for the ZooKeeper connection. */
private final Configuration conf;
/** Curator for ZooKeeper. */
private CuratorFramework curator;
public ZKCuratorManager(Configuration config) throws IOException {
this.conf = config;
}
/**
* Get the curator framework managing the ZooKeeper connection.
* @return Curator framework.
*/
public CuratorFramework getCurator() {
return curator;
}
/**
* Close the connection with ZooKeeper.
*/
public void close() {
if (curator != null) {
curator.close();
}
}
/**
* Utility method to fetch the ZK ACLs from the configuration.
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/
public static List<ACL> getZKAcls(Configuration conf) throws IOException {
// Parse authentication from configuration.
String zkAclConf = conf.get(CommonConfigurationKeys.ZK_ACL,
CommonConfigurationKeys.ZK_ACL_DEFAULT);
try {
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
return ZKUtil.parseACLs(zkAclConf);
} catch (IOException | ZKUtil.BadAclFormatException e) {
LOG.error("Couldn't read ACLs based on {}",
CommonConfigurationKeys.ZK_ACL);
throw e;
}
}
/**
* Utility method to fetch ZK auth info from the configuration.
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
throws IOException {
String zkAuthConf = conf.get(CommonConfigurationKeys.ZK_AUTH);
try {
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
if (zkAuthConf != null) {
return ZKUtil.parseAuth(zkAuthConf);
} else {
return Collections.emptyList();
}
} catch (IOException | ZKUtil.BadAuthFormatException e) {
LOG.error("Couldn't read Auth based on {}",
CommonConfigurationKeys.ZK_AUTH);
throw e;
}
}
/**
* Start the connection to the ZooKeeper ensemble.
* @param conf Configuration for the connection.
* @throws IOException If the connection cannot be started.
*/
public void start() throws IOException {
this.start(new ArrayList<AuthInfo>());
}
/**
* Start the connection to the ZooKeeper ensemble.
* @param conf Configuration for the connection.
* @param authInfos List of authentication keys.
* @throws IOException If the connection cannot be started.
*/
public void start(List<AuthInfo> authInfos) throws IOException {
// Connect to the ZooKeeper ensemble
String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
if (zkHostPort == null) {
throw new IOException(
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
}
int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
CommonConfigurationKeys.ZK_TIMEOUT_MS_DEFAULT);
int zkRetryInterval = conf.getInt(
CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS,
CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS_DEFAULT);
RetryNTimes retryPolicy = new RetryNTimes(numRetries, zkRetryInterval);
// Set up ZK auths
List<ZKUtil.ZKAuthInfo> zkAuths = getZKAuths(conf);
if (authInfos == null) {
authInfos = new ArrayList<>();
}
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkHostPort)
.sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(retryPolicy)
.authorization(authInfos)
.build();
client.start();
this.curator = client;
}
/**
* Get ACLs for a ZNode.
* @param path Path of the ZNode.
* @return The list of ACLs.
* @throws Exception
*/
public List<ACL> getACL(final String path) throws Exception {
return curator.getACL().forPath(path);
}
/**
* Get the data in a ZNode.
* @param path Path of the ZNode.
* @param stat Output statistics of the ZNode.
* @return The data in the ZNode.
* @throws Exception If it cannot contact Zookeeper.
*/
public byte[] getData(final String path) throws Exception {
return curator.getData().forPath(path);
}
/**
* Get the data in a ZNode.
* @param path Path of the ZNode.
* @param stat Output statistics of the ZNode.
* @return The data in the ZNode.
* @throws Exception If it cannot contact Zookeeper.
*/
public String getSringData(final String path) throws Exception {
byte[] bytes = getData(path);
return new String(bytes, Charset.forName("UTF-8"));
}
/**
* Set data into a ZNode.
* @param path Path of the ZNode.
* @param data Data to set.
* @param version Version of the data to store.
* @throws Exception If it cannot contact Zookeeper.
*/
public void setData(String path, byte[] data, int version) throws Exception {
curator.setData().withVersion(version).forPath(path, data);
}
/**
* Set data into a ZNode.
* @param path Path of the ZNode.
* @param data Data to set as String.
* @param version Version of the data to store.
* @throws Exception If it cannot contact Zookeeper.
*/
public void setData(String path, String data, int version) throws Exception {
byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
setData(path, bytes, version);
}
/**
* Get children of a ZNode.
* @param path Path of the ZNode.
* @return The list of children.
* @throws Exception If it cannot contact Zookeeper.
*/
public List<String> getChildren(final String path) throws Exception {
return curator.getChildren().forPath(path);
}
/**
* Check if a ZNode exists.
* @param path Path of the ZNode.
* @return If the ZNode exists.
* @throws Exception If it cannot contact Zookeeper.
*/
public boolean exists(final String path) throws Exception {
return curator.checkExists().forPath(path) != null;
}
/**
* Create a ZNode.
* @param path Path of the ZNode.
* @return If the ZNode was created.
* @throws Exception If it cannot contact Zookeeper.
*/
public boolean create(final String path) throws Exception {
return create(path, null);
}
/**
* Create a ZNode.
* @param path Path of the ZNode.
* @param zkAcl ACL for the node.
* @return If the ZNode was created.
* @throws Exception If it cannot contact Zookeeper.
*/
public boolean create(final String path, List<ACL> zkAcl) throws Exception {
boolean created = false;
if (!exists(path)) {
curator.create()
.withMode(CreateMode.PERSISTENT)
.withACL(zkAcl)
.forPath(path, null);
created = true;
}
return created;
}
/**
* Delete a ZNode.
* @param path Path of the ZNode.
* @throws Exception If it cannot contact ZooKeeper.
*/
public void delete(final String path) throws Exception {
if (exists(path)) {
curator.delete().deletingChildrenIfNeeded().forPath(path);
}
}
/**
* Get the path for a ZNode.
* @param root Root of the ZNode.
* @param nodeName Name of the ZNode.
* @return Path for the ZNode.
*/
public static String getNodePath(String root, String nodeName) {
return root + "/" + nodeName;
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package provides utilities to interact with Curator ZooKeeper.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.util.curator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -2764,4 +2764,50 @@
This determines the number of open file handles.
</description>
</property>
<property>
<description>Host:Port of the ZooKeeper server to be used.
</description>
<name>hadoop.zk.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
<property>
<description>Number of tries to connect to ZooKeeper.</description>
<name>hadoop.zk.num-retries</name>
<value>1000</value>
</property>
<property>
<description>Retry interval in milliseconds when connecting to ZooKeeper.
</description>
<name>hadoop.zk.retry-interval-ms</name>
<value>1000</value>
</property>
<property>
<description>ZooKeeper session timeout in milliseconds. Session expiration
is managed by the ZooKeeper cluster itself, not by the client. This value is
used by the cluster to determine when the client's session expires.
Expirations happens when the cluster does not hear from the client within
the specified session timeout period (i.e. no heartbeat).</description>
<name>hadoop.zk.timeout-ms</name>
<value>10000</value>
</property>
<property>
<description>ACL's to be used for ZooKeeper znodes.</description>
<name>hadoop.zk.acl</name>
<value>world:anyone:rwcda</value>
</property>
<property>
<description>
Specify the auths to be used for the ACL's specified in hadoop.zk.acl.
This takes a comma-separated list of authentication mechanisms, each of the
form 'scheme:auth' (the same syntax used for the 'addAuth' command in
the ZK CLI).
</description>
<name>hadoop.zk.auth</name>
</property>
</configuration>

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.curator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test the manager for ZooKeeper Curator.
*/
public class TestZKCuratorManager {
private TestingServer server;
private ZKCuratorManager curator;
@Before
public void setup() throws Exception {
this.server = new TestingServer();
Configuration conf = new Configuration();
conf.set(
CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
this.curator = new ZKCuratorManager(conf);
this.curator.start();
}
@After
public void teardown() throws Exception {
this.curator.close();
if (this.server != null) {
this.server.close();
this.server = null;
}
}
@Test
public void testReadWriteData() throws Exception {
String testZNode = "/test";
String expectedString = "testString";
assertFalse(curator.exists(testZNode));
curator.create(testZNode);
assertTrue(curator.exists(testZNode));
curator.setData(testZNode, expectedString, -1);
String testString = curator.getSringData("/test");
assertEquals(expectedString, testString);
}
@Test
public void testChildren() throws Exception {
List<String> children = curator.getChildren("/");
assertEquals(1, children.size());
assertFalse(curator.exists("/node1"));
curator.create("/node1");
assertTrue(curator.exists("/node1"));
assertFalse(curator.exists("/node2"));
curator.create("/node2");
assertTrue(curator.exists("/node2"));
children = curator.getChildren("/");
assertEquals(3, children.size());
curator.delete("/node2");
assertFalse(curator.exists("/node2"));
children = curator.getChildren("/");
assertEquals(2, children.size());
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
@ -83,7 +84,17 @@ public class YarnConfiguration extends Configuration {
private static void addDeprecatedKeys() {
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
NM_CLIENT_MAX_NM_PROXIES)
NM_CLIENT_MAX_NM_PROXIES),
new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
new DeprecationDelta(RM_ZK_ADDRESS,
CommonConfigurationKeys.ZK_ADDRESS),
new DeprecationDelta(RM_ZK_NUM_RETRIES,
CommonConfigurationKeys.ZK_NUM_RETRIES),
new DeprecationDelta(RM_ZK_TIMEOUT_MS,
CommonConfigurationKeys.ZK_TIMEOUT_MS),
new DeprecationDelta(RM_ZK_RETRY_INTERVAL_MS,
CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS),
});
}

View File

@ -78,6 +78,15 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
// skip deprecated ZooKeeper settings
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ADDRESS);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
configurationPropsToSkipCompare.add(
YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_AUTH);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ACL);
// Used as Java command line properties, not XML
configurationPrefixToSkipCompare.add("yarn.app.container");

View File

@ -437,31 +437,6 @@
<value>${yarn.resourcemanager.max-completed-applications}</value>
</property>
<property>
<description>Host:Port of the ZooKeeper server to be used by the RM. This
must be supplied when using the ZooKeeper based implementation of the
RM state store and/or embedded automatic failover in an HA setting.
</description>
<name>yarn.resourcemanager.zk-address</name>
<!--value>127.0.0.1:2181</value-->
</property>
<property>
<description>Number of times RM tries to connect to ZooKeeper.</description>
<name>yarn.resourcemanager.zk-num-retries</name>
<value>1000</value>
</property>
<property>
<description>Retry interval in milliseconds when connecting to ZooKeeper.
When HA is enabled, the value here is NOT used. It is generated
automatically from yarn.resourcemanager.zk-timeout-ms and
yarn.resourcemanager.zk-num-retries.
</description>
<name>yarn.resourcemanager.zk-retry-interval-ms</name>
<value>1000</value>
</property>
<property>
<description>Full path of the ZooKeeper znode where RM state will be
stored. This must be supplied when using
@ -471,22 +446,6 @@
<value>/rmstore</value>
</property>
<property>
<description>ZooKeeper session timeout in milliseconds. Session expiration
is managed by the ZooKeeper cluster itself, not by the client. This value is
used by the cluster to determine when the client's session expires.
Expirations happens when the cluster does not hear from the client within
the specified session timeout period (i.e. no heartbeat).</description>
<name>yarn.resourcemanager.zk-timeout-ms</name>
<value>10000</value>
</property>
<property>
<description>ACL's to be used for ZooKeeper znodes.</description>
<name>yarn.resourcemanager.zk-acl</name>
<value>world:anyone:rwcda</value>
</property>
<property>
<description>
ACLs to be used for the root znode when using ZKRMStateStore in an HA
@ -512,18 +471,6 @@
<name>yarn.resourcemanager.zk-state-store.root-node.acl</name>
</property>
<property>
<description>
Specify the auths to be used for the ACL's specified in both the
yarn.resourcemanager.zk-acl and
yarn.resourcemanager.zk-state-store.root-node.acl properties. This
takes a comma-separated list of authentication mechanisms, each of the
form 'scheme:auth' (the same syntax used for the 'addAuth' command in
the ZK CLI).
</description>
<name>yarn.resourcemanager.zk-auth</name>
</property>
<property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -96,8 +97,8 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zookeeper.data.ACL;
import java.util.Collections;
import java.util.List;
/**
* Helper class that provides utility methods specific to ZK operations
*/
@InterfaceAudience.Private
public class RMZKUtils {
private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
/**
* Utility method to fetch the ZK ACLs from the configuration.
*
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/
public static List<ACL> getZKAcls(Configuration conf) throws IOException {
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_RM_ZK_ACL);
try {
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
return ZKUtil.parseACLs(zkAclConf);
} catch (IOException | ZKUtil.BadAclFormatException e) {
LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
throw e;
}
}
/**
* Utility method to fetch ZK auth info from the configuration.
*
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
throws IOException {
String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
try {
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
if (zkAuthConf != null) {
return ZKUtil.parseAuth(zkAuthConf);
} else {
return Collections.emptyList();
}
} catch (IOException | ZKUtil.BadAuthFormatException e) {
LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
throw e;
}
}
}

View File

@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -44,7 +42,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -173,7 +171,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker;
private JvmMetrics jvmMetrics;
private boolean curatorEnabled = false;
private CuratorFramework curator;
private ZKCuratorManager zkManager;
private final String zkRootNodePassword =
Long.toString(new SecureRandom().nextLong());
private boolean recoveryEnabled;
@ -316,7 +314,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
@ -324,50 +322,49 @@ public class ResourceManager extends CompositeService implements Recoverable {
return elector;
}
public CuratorFramework createAndStartCurator(Configuration conf)
/**
* Create and ZooKeeper Curator manager.
* @param config Configuration for the ZooKeeper curator.
* @return New ZooKeeper Curator manager.
* @throws IOException If it cannot create the manager.
*/
public ZKCuratorManager createAndStartZKManager(Configuration config)
throws IOException {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException(
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
}
int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
ZKCuratorManager manager = new ZKCuratorManager(config);
// set up zk auths
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
// Get authentication
List<AuthInfo> authInfos = new ArrayList<>();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
if (HAUtil.isHAEnabled(config) && HAUtil.getConfValueForRMInstance(
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, config) == null) {
String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, config);
String defaultFencingAuth =
zkRootNodeUsername + ":" + zkRootNodePassword;
byte[] defaultFencingAuthData =
defaultFencingAuth.getBytes(Charset.forName("UTF-8"));
String scheme = new DigestAuthenticationProvider().getScheme();
AuthInfo authInfo = new AuthInfo(scheme, defaultFencingAuthData);
authInfos.add(authInfo);
}
if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
String zkRootNodeUsername = HAUtil
.getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
byte[] defaultFencingAuth =
(zkRootNodeUsername + ":" + zkRootNodePassword)
.getBytes(Charset.forName("UTF-8"));
authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
defaultFencingAuth));
}
manager.start(authInfos);
return manager;
}
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkHostPort)
.sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
.authorization(authInfos).build();
client.start();
return client;
/**
* Get the ZooKeeper Curator manager.
* @return ZooKeeper Curator manager.
*/
public ZKCuratorManager getZKManager() {
return this.zkManager;
}
public CuratorFramework getCurator() {
return this.curator;
if (this.zkManager == null) {
return null;
}
return this.zkManager.getCurator();
}
public String getZkRootNodePassword() {
@ -1119,8 +1116,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
configurationProvider.close();
}
super.serviceStop();
if (curator != null) {
curator.close();
if (zkManager != null) {
zkManager.close();
}
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -46,7 +47,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -201,8 +201,8 @@ public class ZKRMStateStore extends RMStateStore {
private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme();
@VisibleForTesting
protected CuratorFramework curatorFramework;
/** Manager for the ZooKeeper connection. */
private ZKCuratorManager zkManager;
/*
* Indicates different app attempt state store operations.
@ -298,12 +298,11 @@ public class ZKRMStateStore extends RMStateStore {
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
}
zkAcl = RMZKUtils.getZKAcls(conf);
zkAcl = ZKCuratorManager.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
if (zkRootNodeAclConf != null) {
zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
@ -330,10 +329,9 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
curatorFramework = resourceManager.getCurator();
if (curatorFramework == null) {
curatorFramework = resourceManager.createAndStartCurator(conf);
zkManager = resourceManager.getZKManager();
if (zkManager == null) {
zkManager = resourceManager.createAndStartZKManager(conf);
}
}
@ -382,6 +380,7 @@ public class ZKRMStateStore extends RMStateStore {
logRootNodeAcls("Before setting ACLs'\n");
}
CuratorFramework curatorFramework = zkManager.getCurator();
if (HAUtil.isHAEnabled(getConfig())) {
curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
} else {
@ -401,6 +400,7 @@ public class ZKRMStateStore extends RMStateStore {
}
if (!HAUtil.isHAEnabled(getConfig())) {
CuratorFramework curatorFramework = zkManager.getCurator();
IOUtils.closeStream(curatorFramework);
}
}
@ -937,6 +937,7 @@ public class ZKRMStateStore extends RMStateStore {
}
safeDelete(appIdRemovePath);
} else {
CuratorFramework curatorFramework = zkManager.getCurator();
curatorFramework.delete().deletingChildrenIfNeeded().
forPath(appIdRemovePath);
}
@ -1237,38 +1238,32 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);
return zkManager.getData(path);
}
@VisibleForTesting
List<ACL> getACL(final String path) throws Exception {
return curatorFramework.getACL().forPath(path);
return zkManager.getACL(path);
}
@VisibleForTesting
List<String> getChildren(final String path) throws Exception {
return curatorFramework.getChildren().forPath(path);
return zkManager.getChildren(path);
}
@VisibleForTesting
boolean exists(final String path) throws Exception {
return curatorFramework.checkExists().forPath(path) != null;
return zkManager.exists(path);
}
@VisibleForTesting
void create(final String path) throws Exception {
if (!exists(path)) {
curatorFramework.create()
.withMode(CreateMode.PERSISTENT).withACL(zkAcl)
.forPath(path, null);
}
zkManager.create(path, zkAcl);
}
@VisibleForTesting
void delete(final String path) throws Exception {
if (exists(path)) {
curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
}
zkManager.delete(path);
}
private void safeCreate(String path, byte[] data, List<ACL> acl,
@ -1311,6 +1306,7 @@ public class ZKRMStateStore extends RMStateStore {
private CuratorTransactionFinal transactionFinal;
SafeTransaction() throws Exception {
CuratorFramework curatorFramework = zkManager.getCurator();
CuratorTransaction transaction = curatorFramework.inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(zkAcl)

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -61,8 +62,8 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.set(CommonConfigurationKeys.ZK_ADDRESS, hostPort);
configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;