diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 89af8d06f1a..11d46574b23 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -53,6 +53,8 @@ import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Copy-paste of ClientBase from ZooKeeper, but without any of the
* JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
@@ -111,7 +113,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
synchronized boolean isConnected() {
return connected;
}
- synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
+ @VisibleForTesting
+ public synchronized void waitForConnected(long timeout)
+ throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(!connected && left > 0) {
@@ -123,7 +127,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
}
}
- synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
+ @VisibleForTesting
+ public synchronized void waitForDisconnected(long timeout)
+ throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(connected && left > 0) {
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5bd27252eb2..30e678c71cc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -12,10 +12,15 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
+
YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas)
+
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
+ YARN-353. Add Zookeeper-based store implementation for RMStateStore.
+ (Bikas Saha, Jian He and Karthik Kambatla via hitesh)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8142b78d58e..da23133cfa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -276,12 +276,40 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final boolean DEFAULT_RM_HA_ENABLED = false;
+
+ ////////////////////////////////
+ // RM state store configs
+ ////////////////////////////////
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
+ /**
+ * Comma separated host:port pairs, each corresponding to a ZK server for
+ * ZKRMStateStore
+ */
+ public static final String ZK_STATE_STORE_PREFIX =
+ RM_PREFIX + "zk.state-store.";
+ public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
+ ZK_STATE_STORE_PREFIX + "num-retries";
+ public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
+ public static final String ZK_RM_STATE_STORE_ADDRESS =
+ ZK_STATE_STORE_PREFIX + "address";
+ /** Timeout in millisec for ZK server connection for ZKRMStateStore */
+ public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
+ ZK_STATE_STORE_PREFIX + "timeout.ms";
+ public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
+ /** Parent znode path under which ZKRMStateStore will create znodes */
+ public static final String ZK_RM_STATE_STORE_PARENT_PATH =
+ ZK_STATE_STORE_PREFIX + "parent-path";
+ public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
+ /** ACL for znodes in ZKRMStateStore */
+ public static final String ZK_RM_STATE_STORE_ACL =
+ ZK_STATE_STORE_PREFIX + "acl";
+ public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
+ "world:anyone:rwcda";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c0f3a1ec417..8aa7d01735b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -258,6 +258,51 @@
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+
+ Host:Port of the ZooKeeper server where RM state will
+ be stored. This must be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+ as the value for yarn.resourcemanager.store.class
+ yarn.resourcemanager.zk.state-store.address
+
+
+
+
+ Number of times ZKRMStateStore tries to connect to
+ ZooKeeper. This may be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+ as the value for yarn.resourcemanager.store.class
+ yarn.resourcemanager.zk.state-store.num-retries
+ 3
+
+
+
+ Full path of the ZooKeeper znode where RM state will be
+ stored. This must be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+ as the value for yarn.resourcemanager.store.class
+ yarn.resourcemanager.zk.state-store.parent-path
+ /rmstore
+
+
+
+ Timeout when connecting to ZooKeeper.
+ This may be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+ as the value for yarn.resourcemanager.store.class
+ yarn.resourcemanager.zk.state-store.timeout.ms
+ 60000
+
+
+
+ ACL's to be used for ZooKeeper znodes.
+ This may be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+ as the value for yarn.resourcemanager.store.class
+ yarn.resourcemanager.zk.state-store.acl
+ world:anyone:rwcda
+
+
URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 578b20cb7d2..6cf26f11a30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -41,6 +41,16 @@
org.apache.hadoop
hadoop-yarn-server-web-proxy
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.apache.zookeeper
+ zookeeper
+ test-jar
+ test
+
org.apache.hadoop
hadoop-hdfs
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 30d5d4108d4..062f5cc5532 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -63,12 +63,6 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
- private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
- private static final String RM_APP_ROOT = "RMAppRoot";
- private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
- private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
- private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
- "RMDTSequenceNumber_";
protected FileSystem fs;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 382ed97d61b..2f4b8960205 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -65,6 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
*/
public abstract class RMStateStore extends AbstractService {
+ // constants for RM App state and RMDTSecretManagerState.
+ protected static final String RM_APP_ROOT = "RMAppRoot";
+ protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+ protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+ protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+ protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+ "RMDTSequenceNumber_";
+
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
public RMStateStore() {
@@ -464,8 +472,9 @@ public abstract class RMStateStore extends AbstractService {
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
-
- LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+ }
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
new file mode 100644
index 00000000000..41c95d3ff85
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Private
+@Unstable
+public class ZKRMStateStore extends RMStateStore {
+
+ public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
+
+ private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+ private int numRetries;
+
+ private String zkHostPort = null;
+ private int zkSessionTimeout;
+ private List zkAcl;
+ private String zkRootNodePath;
+ private String rmDTSecretManagerRoot;
+ private String rmAppRoot;
+ private String dtSequenceNumberPath = null;
+
+ @VisibleForTesting
+ protected String znodeWorkingPath;
+
+ @VisibleForTesting
+ protected ZooKeeper zkClient;
+ private ZooKeeper oldZkClient;
+
+ @Override
+ public synchronized void initInternal(Configuration conf) throws Exception {
+ zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
+ if (zkHostPort == null) {
+ throw new YarnRuntimeException("No server address specified for " +
+ "zookeeper state store for Resource Manager recovery. " +
+ YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured.");
+ }
+ numRetries =
+ conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES);
+ znodeWorkingPath =
+ conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
+ zkSessionTimeout =
+ conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
+ // Parse authentication from configuration.
+ String zkAclConf =
+ conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL);
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+
+ try {
+ zkAcl = ZKUtil.parseACLs(zkAclConf);
+ } catch (ZKUtil.BadAclFormatException bafe) {
+ LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL);
+ throw bafe;
+ }
+
+ zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
+ rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
+ rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+ }
+
+ @Override
+ public synchronized void startInternal() throws Exception {
+ // createConnection for future API calls
+ createConnection();
+
+ // ensure root dirs exist
+ createRootDir(znodeWorkingPath);
+ createRootDir(zkRootNodePath);
+ createRootDir(rmDTSecretManagerRoot);
+ createRootDir(rmAppRoot);
+ }
+
+ private void createRootDir(String rootPath) throws Exception {
+ try {
+ createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ } catch (KeeperException ke) {
+ if (ke.code() != Code.NODEEXISTS) {
+ throw ke;
+ }
+ }
+ }
+
+ private synchronized void closeZkClients() throws IOException {
+ if (zkClient != null) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing ZK", e);
+ }
+ zkClient = null;
+ }
+ if (oldZkClient != null) {
+ try {
+ oldZkClient.close();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing old ZK", e);
+ }
+ oldZkClient = null;
+ }
+ }
+
+ @Override
+ protected synchronized void closeInternal() throws Exception {
+ closeZkClients();
+ }
+
+ @Override
+ public synchronized RMState loadState() throws Exception {
+ RMState rmState = new RMState();
+ // recover DelegationTokenSecretManager
+ loadRMDTSecretManagerState(rmState);
+ // recover RM applications
+ loadRMAppState(rmState);
+ return rmState;
+ }
+
+ private synchronized void loadRMDTSecretManagerState(RMState rmState)
+ throws Exception {
+ List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
+
+ for (String childNodeName : childNodes) {
+ if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+ rmState.rmSecretManagerState.dtSequenceNumber =
+ Integer.parseInt(childNodeName.split("_")[1]);
+ continue;
+ }
+ String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
+ byte[] childData = getDataWithRetries(childNodePath, true);
+
+ ByteArrayInputStream is = new ByteArrayInputStream(childData);
+ DataInputStream fsIn = new DataInputStream(is);
+ try {
+ if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
+ DelegationKey key = new DelegationKey();
+ key.readFields(fsIn);
+ rmState.rmSecretManagerState.masterKeyState.add(key);
+ } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+ RMDelegationTokenIdentifier identifier =
+ new RMDelegationTokenIdentifier();
+ identifier.readFields(fsIn);
+ long renewDate = fsIn.readLong();
+ rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+ renewDate);
+ }
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ private synchronized void loadRMAppState(RMState rmState) throws Exception {
+ List childNodes = zkClient.getChildren(rmAppRoot, true);
+ List attempts =
+ new ArrayList();
+ for (String childNodeName : childNodes) {
+ String childNodePath = getNodePath(rmAppRoot, childNodeName);
+ byte[] childData = getDataWithRetries(childNodePath, true);
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ // application
+ LOG.info("Loading application from znode: " + childNodeName);
+ ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
+ ApplicationStateDataPBImpl appStateData =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(childData));
+ ApplicationState appState =
+ new ApplicationState(appStateData.getSubmitTime(),
+ appStateData.getApplicationSubmissionContext(),
+ appStateData.getUser());
+ if (!appId.equals(appState.context.getApplicationId())) {
+ throw new YarnRuntimeException("The child node name is different " +
+ "from the application id");
+ }
+ rmState.appState.put(appId, appState);
+ } else if (childNodeName
+ .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ // attempt
+ LOG.info("Loading application attempt from znode: " + childNodeName);
+ ApplicationAttemptId attemptId =
+ ConverterUtils.toApplicationAttemptId(childNodeName);
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl(
+ ApplicationAttemptStateDataProto.parseFrom(childData));
+ Credentials credentials = null;
+ if (attemptStateData.getAppAttemptTokens() != null) {
+ credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(attemptStateData.getAppAttemptTokens());
+ credentials.readTokenStorageStream(dibb);
+ }
+ ApplicationAttemptState attemptState =
+ new ApplicationAttemptState(attemptId,
+ attemptStateData.getMasterContainer(), credentials);
+ if (!attemptId.equals(attemptState.getAttemptId())) {
+ throw new YarnRuntimeException("The child node name is different " +
+ "from the application attempt id");
+ }
+ attempts.add(attemptState);
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
+ }
+ }
+
+ // go through all attempts and add them to their apps
+ for (ApplicationAttemptState attemptState : attempts) {
+ ApplicationId appId = attemptState.getAttemptId().getApplicationId();
+ ApplicationState appState = rmState.appState.get(appId);
+ if (appState != null) {
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ } else {
+ // the application znode may have been removed when the application
+ // completed but the RM might have stopped before it could remove the
+ // application attempt znodes
+ LOG.info("Application node not found for attempt: "
+ + attemptState.getAttemptId());
+ deleteWithRetries(
+ getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
+ 0);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void storeApplicationState(
+ String appId, ApplicationStateDataPBImpl appStateDataPB) throws
+ Exception {
+ String nodeCreatePath = getNodePath(rmAppRoot, appId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
+ }
+ byte[] appStateData = appStateDataPB.getProto().toByteArray();
+ createWithRetries(
+ nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+ }
+
+ @Override
+ public synchronized void storeApplicationAttemptState(
+ String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ throws Exception {
+ String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for attempt: " + attemptId + " at: "
+ + nodeCreatePath);
+ }
+ byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+ createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
+ CreateMode.PERSISTENT);
+ }
+
+ @Override
+ public synchronized void removeApplicationState(ApplicationState appState)
+ throws Exception {
+ String appId = appState.getAppId().toString();
+ String nodeRemovePath = getNodePath(rmAppRoot, appId);
+ ArrayList opList = new ArrayList();
+ opList.add(Op.delete(nodeRemovePath, 0));
+
+ for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+ String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+ opList.add(Op.delete(attemptRemovePath, 0));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+ + " and its attempts.");
+ }
+ doMultiWithRetries(opList);
+ }
+
+ @Override
+ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) throws Exception {
+ ArrayList opList = new ArrayList();
+ // store RM delegation token
+ String nodeCreatePath =
+ getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ + rmDTIdentifier.getSequenceNumber());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ try {
+ rmDTIdentifier.write(fsOut);
+ fsOut.writeLong(renewDate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing RMDelegationToken_" +
+ rmDTIdentifier.getSequenceNumber());
+ }
+ opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT));
+ } finally {
+ os.close();
+ }
+
+ // store sequence number
+ String latestSequenceNumberPath =
+ getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
+ latestSequenceNumber);
+ }
+
+ if (dtSequenceNumberPath != null) {
+ opList.add(Op.delete(dtSequenceNumberPath, 0));
+ }
+ opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
+ CreateMode.PERSISTENT));
+ dtSequenceNumberPath = latestSequenceNumberPath;
+ doMultiWithRetries(opList);
+ }
+
+ @Override
+ protected synchronized void removeRMDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
+ String nodeRemovePath =
+ getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ + rmDTIdentifier.getSequenceNumber());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing RMDelegationToken_"
+ + rmDTIdentifier.getSequenceNumber());
+ }
+ deleteWithRetries(nodeRemovePath, 0);
+ }
+
+ @Override
+ protected synchronized void storeRMDTMasterKeyState(
+ DelegationKey delegationKey) throws Exception {
+ String nodeCreatePath =
+ getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ + delegationKey.getKeyId());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
+ }
+ delegationKey.write(fsOut);
+ try {
+ createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT);
+ } finally {
+ os.close();
+ }
+ }
+
+ @Override
+ protected synchronized void removeRMDTMasterKeyState(
+ DelegationKey delegationKey) throws Exception {
+ String nodeRemovePath =
+ getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ + delegationKey.getKeyId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
+ }
+ deleteWithRetries(nodeRemovePath, 0);
+ }
+
+ // ZK related code
+ /**
+ * Watcher implementation which forward events to the ZKRMStateStore This
+ * hides the ZK methods of the store from its public interface
+ */
+ private final class ForwardingWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ ZKRMStateStore.this.processWatchEvent(event);
+ } catch (Throwable t) {
+ LOG.error("Failed to process watcher event " + event + ": "
+ + StringUtils.stringifyException(t));
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void processWatchEvent(WatchedEvent event)
+ throws Exception {
+ Event.EventType eventType = event.getType();
+ LOG.info("Watcher event type: " + eventType + " with state:"
+ + event.getState() + " for path:" + event.getPath() + " for " + this);
+
+ if (eventType == Event.EventType.None) {
+
+ // the connection state has changed
+ switch (event.getState()) {
+ case SyncConnected:
+ LOG.info("ZKRMStateStore Session connected");
+ if (oldZkClient != null) {
+ // the SyncConnected must be from the client that sent Disconnected
+ zkClient = oldZkClient;
+ oldZkClient = null;
+ ZKRMStateStore.this.notifyAll();
+ LOG.info("ZKRMStateStore Session restored");
+ }
+ break;
+ case Disconnected:
+ LOG.info("ZKRMStateStore Session disconnected");
+ oldZkClient = zkClient;
+ zkClient = null;
+ break;
+ case Expired:
+ // the connection got terminated because of session timeout
+ // call listener to reconnect
+ LOG.info("Session expired");
+ createConnection();
+ break;
+ default:
+ LOG.error("Unexpected Zookeeper" +
+ " watch event state: " + event.getState());
+ break;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+
+ @VisibleForTesting
+ public String createWithRetries(
+ final String path, final byte[] data, final List acl,
+ final CreateMode mode) throws Exception {
+ return new ZKAction() {
+ @Override
+ public String run() throws KeeperException, InterruptedException {
+ return zkClient.create(path, data, acl, mode);
+ }
+ }.runWithRetries();
+ }
+
+ private void deleteWithRetries(final String path, final int version)
+ throws Exception {
+ new ZKAction() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ /**
+ * Call exists() to leave a watch on the node denoted by path.
+ * Delete node if exists. To pass the existence information to the
+ * caller, call delete irrespective of whether node exists or not.
+ */
+ if (zkClient.exists(path, true) == null) {
+ LOG.error("Trying to delete a path (" + path
+ + ") that doesn't exist.");
+ }
+ zkClient.delete(path, version);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private void doMultiWithRetries(final ArrayList opList) throws Exception {
+ new ZKAction() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.multi(opList);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ @VisibleForTesting
+ public void setDataWithRetries(final String path, final byte[] data,
+ final int version) throws Exception {
+ new ZKAction() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.setData(path, data, version);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ @VisibleForTesting
+ public byte[] getDataWithRetries(final String path, final boolean watch)
+ throws Exception {
+ return new ZKAction() {
+ @Override
+ public byte[] run() throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ return zkClient.getData(path, watch, stat);
+ }
+ }.runWithRetries();
+ }
+
+ private abstract class ZKAction {
+ // run() expects synchronization on ZKRMStateStore.this
+ abstract T run() throws KeeperException, InterruptedException;
+
+ T runWithCheck() throws Exception {
+ long startTime = System.currentTimeMillis();
+ synchronized (ZKRMStateStore.this) {
+ while (zkClient == null) {
+ ZKRMStateStore.this.wait(zkSessionTimeout);
+ if (zkClient != null) {
+ break;
+ }
+ if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
+ throw new IOException("Wait for ZKClient creation timed out");
+ }
+ }
+ return run();
+ }
+ }
+
+ T runWithRetries() throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ return runWithCheck();
+ } catch (KeeperException ke) {
+ if (shouldRetry(ke.code()) && ++retry < numRetries) {
+ continue;
+ }
+ throw ke;
+ }
+ }
+ }
+ }
+
+ private static boolean shouldRetry(Code code) {
+ switch (code) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ private synchronized void createConnection()
+ throws IOException, InterruptedException {
+ closeZkClients();
+ for (int retries = 0; retries < numRetries && zkClient == null;
+ retries++) {
+ try {
+ zkClient = getNewZooKeeper();
+ } catch (IOException ioe) {
+ // Retry in case of network failures
+ LOG.info("Failed to connect to the ZooKeeper on attempt - " +
+ (retries + 1));
+ ioe.printStackTrace();
+ }
+ }
+ if (zkClient == null) {
+ LOG.error("Unable to connect to Zookeeper");
+ throw new YarnRuntimeException("Unable to connect to Zookeeper");
+ }
+ ZKRMStateStore.this.notifyAll();
+ LOG.info("Created new ZK connection");
+ }
+
+ // protected to mock for testing
+ @VisibleForTesting
+ protected synchronized ZooKeeper getNewZooKeeper()
+ throws IOException, InterruptedException {
+ ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
+ zk.register(new ForwardingWatcher());
+ return zk;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
index aef92d5fdcc..80d923f7216 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
@@ -26,8 +26,10 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
@@ -67,13 +70,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import org.apache.zookeeper.ZooKeeper;
+
import org.junit.Test;
-public class TestRMStateStore {
+public class TestRMStateStore extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
- class TestDispatcher implements Dispatcher, EventHandler {
+ static class TestDispatcher implements
+ Dispatcher, EventHandler {
ApplicationAttemptId attemptId;
Exception storedException;
@@ -82,7 +89,8 @@ public class TestRMStateStore {
@SuppressWarnings("rawtypes")
@Override
- public void register(Class extends Enum> eventType, EventHandler handler) {
+ public void register(Class extends Enum> eventType,
+ EventHandler handler) {
}
@Override
@@ -108,10 +116,18 @@ public class TestRMStateStore {
boolean isFinalStateValid() throws Exception;
}
+ @Test
+ public void testZKRMStateStoreRealZK() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ testRMAppStateStore(zkTester);
+ testRMDTSecretManagerStateStore(zkTester);
+ }
+
@Test
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMAppStateStore(fsTester);
@@ -121,6 +137,41 @@ public class TestRMStateStore {
}
}
+ class TestZKRMStateStoreTester implements RMStateStoreHelper {
+ ZooKeeper client;
+ ZKRMStateStore store;
+
+ class TestZKRMStateStore extends ZKRMStateStore {
+ public TestZKRMStateStore(Configuration conf, String workingZnode)
+ throws Exception {
+ init(conf);
+ start();
+ assertTrue(znodeWorkingPath.equals(workingZnode));
+ }
+
+ @Override
+ public ZooKeeper getNewZooKeeper() throws IOException {
+ return client;
+ }
+ }
+
+ public RMStateStore getRMStateStore() throws Exception {
+ String workingZnode = "/Test";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+ this.client = createClient();
+ this.store = new TestZKRMStateStore(conf, workingZnode);
+ return this.store;
+ }
+
+ @Override
+ public boolean isFinalStateValid() throws Exception {
+ List nodes = client.getChildren(store.znodeWorkingPath, false);
+ return nodes.size() == 1;
+ }
+ }
+
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
FileSystemRMStateStore store;
@@ -149,7 +200,8 @@ public class TestRMStateStore {
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString());
+ conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+ workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
return store;
}
@@ -158,11 +210,7 @@ public class TestRMStateStore {
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
- if(files.length == 1) {
- // only store root directory should exist
- return true;
- }
- return false;
+ return files.length == 1;
}
}
@@ -183,9 +231,10 @@ public class TestRMStateStore {
dispatcher.notified = false;
}
- void storeApp(RMStateStore store, ApplicationId appId, long time)
- throws Exception {
- ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
+ void storeApp(
+ RMStateStore store, ApplicationId appId, long time) throws Exception {
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
RMApp mockApp = mock(RMApp.class);
@@ -216,7 +265,8 @@ public class TestRMStateStore {
return container.getId();
}
- void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
+ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
@@ -271,7 +321,8 @@ public class TestRMStateStore {
RMApp mockRemovedApp = mock(RMApp.class);
HashMap attempts =
new HashMap();
- ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved);
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
@@ -288,7 +339,8 @@ public class TestRMStateStore {
// load state
store = stateStoreHelper.getRMStateStore();
RMState state = store.loadState();
- Map rmAppState = state.getApplicationState();
+ Map rmAppState =
+ state.getApplicationState();
ApplicationState appState = rmAppState.get(appId1);
// app is loaded
@@ -362,7 +414,8 @@ public class TestRMStateStore {
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
- Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
+ Assert.assertEquals(sequenceNumber,
+ secretManagerState.getDTSequenceNumber());
}
private Token generateAMRMToken(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
new file mode 100644
index 00000000000..7c807a5b602
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
+import org.apache.hadoop.util.ZKUtil;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestZKRMStateStoreZKClientConnections extends
+ ClientBaseWithFixes {
+ private static final int ZK_OP_WAIT_TIME = 3000;
+ private Log LOG =
+ LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
+
+ class TestZKClient {
+ ZKRMStateStore store;
+ boolean forExpire = false;
+ TestForwardingWatcher watcher;
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+
+ protected class TestZKRMStateStore extends ZKRMStateStore {
+ public TestZKRMStateStore(Configuration conf, String workingZnode)
+ throws Exception {
+ init(conf);
+ start();
+ assertTrue(znodeWorkingPath.equals(workingZnode));
+ }
+
+ @Override
+ public ZooKeeper getNewZooKeeper()
+ throws IOException, InterruptedException {
+ return createClient(watcher, hostPort, 100);
+ }
+
+ @Override
+ public synchronized void processWatchEvent(WatchedEvent event)
+ throws Exception {
+
+ if (forExpire) {
+ // a hack... couldn't find a way to trigger expired event.
+ WatchedEvent expriredEvent = new WatchedEvent(
+ Watcher.Event.EventType.None,
+ Watcher.Event.KeeperState.Expired, null);
+ super.processWatchEvent(expriredEvent);
+ forExpire = false;
+ syncBarrier.await();
+ } else {
+ super.processWatchEvent(event);
+ }
+ }
+ }
+
+ private class TestForwardingWatcher extends
+ ClientBaseWithFixes.CountdownWatcher {
+ public void process(WatchedEvent event) {
+ super.process(event);
+ try {
+ if (store != null) {
+ store.processWatchEvent(event);
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to process watcher event " + event + ": "
+ + StringUtils.stringifyException(t));
+ }
+ }
+ }
+
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+ String workingZnode = "/Test";
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+ watcher = new TestForwardingWatcher();
+ this.store = new TestZKRMStateStore(conf, workingZnode);
+ return this.store;
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testZKClientDisconnectAndReconnect()
+ throws Exception {
+
+ TestZKClient zkClientTester = new TestZKClient();
+ String path = "/test";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+ ZKRMStateStore store =
+ (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ // trigger watch
+ store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ store.getDataWithRetries(path, true);
+ store.setDataWithRetries(path, "newBytes".getBytes(), 0);
+
+ stopServer();
+ zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
+ try {
+ store.getDataWithRetries(path, true);
+ fail("Expected ZKClient time out exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains(
+ "Wait for ZKClient creation timed out"));
+ }
+
+ // ZKRMStateStore Session restored
+ startServer();
+ zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
+ byte[] ret = null;
+ try {
+ ret = store.getDataWithRetries(path, true);
+ } catch (Exception e) {
+ String error = "ZKRMStateStore Session restore failed";
+ LOG.error(error, e);
+ fail(error);
+ }
+ Assert.assertEquals("newBytes", new String(ret));
+ }
+
+ @Test(timeout = 20000)
+ public void testZKSessionTimeout() throws Exception {
+
+ TestZKClient zkClientTester = new TestZKClient();
+ String path = "/test";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+ ZKRMStateStore store =
+ (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ // a hack to trigger expired event
+ zkClientTester.forExpire = true;
+
+ // trigger watch
+ store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ store.getDataWithRetries(path, true);
+ store.setDataWithRetries(path, "bytes".getBytes(), 0);
+
+ zkClientTester.syncBarrier.await();
+ // after this point, expired event has already been processed.
+
+ try {
+ byte[] ret = store.getDataWithRetries(path, false);
+ Assert.assertEquals("bytes", new String(ret));
+ } catch (Exception e) {
+ String error = "New session creation failed";
+ LOG.error(error, e);
+ fail(error);
+ }
+ }
+
+ @Test (timeout = 20000)
+ public void testSetZKAcl() {
+ TestZKClient zkClientTester = new TestZKClient();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca");
+ try {
+ zkClientTester.store.zkClient.delete(zkClientTester.store
+ .znodeWorkingPath, -1);
+ fail("Shouldn't be able to delete path");
+ } catch (Exception e) {/* expected behavior */}
+ }
+
+ @Test (timeout = 20000)
+ public void testInvalidZKAclConfiguration() {
+ TestZKClient zkClientTester = new TestZKClient();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*");
+ try {
+ zkClientTester.getRMStateStore(conf);
+ fail("ZKRMStateStore created with bad ACL");
+ } catch (ZKUtil.BadAclFormatException bafe) {
+ // expected behavior
+ } catch (Exception e) {
+ String error = "Incorrect exception on BadAclFormat";
+ LOG.error(error, e);
+ fail(error);
+ }
+ }
+}