YARN-353. Add Zookeeper-based store implementation for RMStateStore. Contributed by Bikas Saha, Jian He and Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524829 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-09-19 20:39:10 +00:00
parent 8de47fc570
commit 8628c1704b
10 changed files with 1016 additions and 27 deletions

View File

@ -53,6 +53,8 @@ import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Copy-paste of ClientBase from ZooKeeper, but without any of the
* JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
@ -111,7 +113,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
synchronized boolean isConnected() {
return connected;
}
synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
@VisibleForTesting
public synchronized void waitForConnected(long timeout)
throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(!connected && left > 0) {
@ -123,7 +127,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
}
}
synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
@VisibleForTesting
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {
long expire = Time.now() + timeout;
long left = timeout;
while(connected && left > 0) {

View File

@ -27,10 +27,15 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
OPTIMIZATIONS
BUG FIXES

View File

@ -276,12 +276,40 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final boolean DEFAULT_RM_HA_ENABLED = false;
////////////////////////////////
// RM state store configs
////////////////////////////////
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
/**
* Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore
*/
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk.state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
ZK_STATE_STORE_PREFIX + "timeout.ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
ZK_STATE_STORE_PREFIX + "parent-path";
public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
/** ACL for znodes in ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_ACL =
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =

View File

@ -258,6 +258,51 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
</property>
<property>
<description>Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
<property>
<description>Number of times ZKRMStateStore tries to connect to
ZooKeeper. This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.num-retries</name>
<value>3</value>
</property>
<property>
<description>Full path of the ZooKeeper znode where RM state will be
stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.parent-path</name>
<value>/rmstore</value>
</property>
<property>
<description>Timeout when connecting to ZooKeeper.
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.timeout.ms</name>
<value>60000</value>
</property>
<property>
<description>ACL's to be used for ZooKeeper znodes.
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.acl</name>
<value>world:anyone:rwcda</value>
</property>
<property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using

View File

@ -41,6 +41,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@ -63,12 +63,6 @@ public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
private static final String RM_APP_ROOT = "RMAppRoot";
private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
protected FileSystem fs;

View File

@ -65,6 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
*/
public abstract class RMStateStore extends AbstractService {
// constants for RM App state and RMDTSecretManagerState.
protected static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
public RMStateStore() {
@ -464,8 +472,9 @@ public abstract class RMStateStore extends AbstractService {
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {

View File

@ -0,0 +1,621 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
private int numRetries;
private String zkHostPort = null;
private int zkSessionTimeout;
private List<ACL> zkAcl;
private String zkRootNodePath;
private String rmDTSecretManagerRoot;
private String rmAppRoot;
private String dtSequenceNumberPath = null;
@VisibleForTesting
protected String znodeWorkingPath;
@VisibleForTesting
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException("No server address specified for " +
"zookeeper state store for Resource Manager recovery. " +
YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured.");
}
numRetries =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES);
znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
try {
zkAcl = ZKUtil.parseACLs(zkAclConf);
} catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL);
throw bafe;
}
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
}
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDir(znodeWorkingPath);
createRootDir(zkRootNodePath);
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
}
private void createRootDir(String rootPath) throws Exception {
try {
createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException ke) {
if (ke.code() != Code.NODEEXISTS) {
throw ke;
}
}
}
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
try {
zkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK", e);
}
zkClient = null;
}
if (oldZkClient != null) {
try {
oldZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing old ZK", e);
}
oldZkClient = null;
}
}
@Override
protected synchronized void closeInternal() throws Exception {
closeZkClients();
}
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
return rmState;
}
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
continue;
}
String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
}
} finally {
is.close();
}
}
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
LOG.info("Loading application from znode: " + childNodeName);
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser());
if (!appId.equals(appState.context.getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
LOG.info("Loading application attempt from znode: " + childNodeName);
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
if (!attemptId.equals(attemptState.getAttemptId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application attempt id");
}
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
// go through all attempts and add them to their apps
for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
if (appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
// the application znode may have been removed when the application
// completed but the RM might have stopped before it could remove the
// application attempt znodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
0);
}
}
}
@Override
public synchronized void storeApplicationState(
String appId, ApplicationStateDataPBImpl appStateDataPB) throws
Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
createWithRetries(
nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
}
@Override
public synchronized void storeApplicationAttemptState(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptId + " at: "
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, 0));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, 0));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+ " and its attempts.");
}
doMultiWithRetries(opList);
}
@Override
protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
// store RM delegation token
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
try {
rmDTIdentifier.write(fsOut);
fsOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
} finally {
os.close();
}
// store sequence number
String latestSequenceNumberPath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
latestSequenceNumber);
}
if (dtSequenceNumberPath != null) {
opList.add(Op.delete(dtSequenceNumberPath, 0));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
dtSequenceNumberPath = latestSequenceNumberPath;
doMultiWithRetries(opList);
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
deleteWithRetries(nodeRemovePath, 0);
}
@Override
protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
}
delegationKey.write(fsOut);
try {
createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
} finally {
os.close();
}
}
@Override
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
deleteWithRetries(nodeRemovePath, 0);
}
// ZK related code
/**
* Watcher implementation which forward events to the ZKRMStateStore This
* hides the ZK methods of the store from its public interface
*/
private final class ForwardingWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
try {
ZKRMStateStore.this.processWatchEvent(event);
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
}
}
}
@VisibleForTesting
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
if (oldZkClient != null) {
// the SyncConnected must be from the client that sent Disconnected
zkClient = oldZkClient;
oldZkClient = null;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
oldZkClient = zkClient;
zkClient = null;
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired");
createConnection();
break;
default:
LOG.error("Unexpected Zookeeper" +
" watch event state: " + event.getState());
break;
}
}
}
@VisibleForTesting
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
@VisibleForTesting
public String createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
return new ZKAction<String>() {
@Override
public String run() throws KeeperException, InterruptedException {
return zkClient.create(path, data, acl, mode);
}
}.runWithRetries();
}
private void deleteWithRetries(final String path, final int version)
throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
/**
* Call exists() to leave a watch on the node denoted by path.
* Delete node if exists. To pass the existence information to the
* caller, call delete irrespective of whether node exists or not.
*/
if (zkClient.exists(path, true) == null) {
LOG.error("Trying to delete a path (" + path
+ ") that doesn't exist.");
}
zkClient.delete(path, version);
return null;
}
}.runWithRetries();
}
private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.multi(opList);
return null;
}
}.runWithRetries();
}
@VisibleForTesting
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.setData(path, data, version);
return null;
}
}.runWithRetries();
}
@VisibleForTesting
public byte[] getDataWithRetries(final String path, final boolean watch)
throws Exception {
return new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
Stat stat = new Stat();
return zkClient.getData(path, watch, stat);
}
}.runWithRetries();
}
private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
T runWithCheck() throws Exception {
long startTime = System.currentTimeMillis();
synchronized (ZKRMStateStore.this) {
while (zkClient == null) {
ZKRMStateStore.this.wait(zkSessionTimeout);
if (zkClient != null) {
break;
}
if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
throw new IOException("Wait for ZKClient creation timed out");
}
}
return run();
}
}
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return runWithCheck();
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
continue;
}
throw ke;
}
}
}
}
private static boolean shouldRetry(Code code) {
switch (code) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
return true;
default:
break;
}
return false;
}
private synchronized void createConnection()
throws IOException, InterruptedException {
closeZkClients();
for (int retries = 0; retries < numRetries && zkClient == null;
retries++) {
try {
zkClient = getNewZooKeeper();
} catch (IOException ioe) {
// Retry in case of network failures
LOG.info("Failed to connect to the ZooKeeper on attempt - " +
(retries + 1));
ioe.printStackTrace();
}
}
if (zkClient == null) {
LOG.error("Unable to connect to Zookeeper");
throw new YarnRuntimeException("Unable to connect to Zookeeper");
}
ZKRMStateStore.this.notifyAll();
LOG.info("Created new ZK connection");
}
// protected to mock for testing
@VisibleForTesting
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new ForwardingWatcher());
return zk;
}
}

View File

@ -26,8 +26,10 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
@ -67,13 +70,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
public class TestRMStateStore {
public class TestRMStateStore extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
class TestDispatcher implements Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
static class TestDispatcher implements
Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
ApplicationAttemptId attemptId;
Exception storedException;
@ -82,7 +89,8 @@ public class TestRMStateStore {
@SuppressWarnings("rawtypes")
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
}
@Override
@ -108,10 +116,18 @@ public class TestRMStateStore {
boolean isFinalStateValid() throws Exception;
}
@Test
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
@Test
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMAppStateStore(fsTester);
@ -121,6 +137,41 @@ public class TestRMStateStore {
}
}
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
ZKRMStateStore store;
class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
}
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
}
}
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
FileSystemRMStateStore store;
@ -149,7 +200,8 @@ public class TestRMStateStore {
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
return store;
}
@ -158,11 +210,7 @@ public class TestRMStateStore {
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
if(files.length == 1) {
// only store root directory should exist
return true;
}
return false;
return files.length == 1;
}
}
@ -183,9 +231,10 @@ public class TestRMStateStore {
dispatcher.notified = false;
}
void storeApp(RMStateStore store, ApplicationId appId, long time)
throws Exception {
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
void storeApp(
RMStateStore store, ApplicationId appId, long time) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
RMApp mockApp = mock(RMApp.class);
@ -216,7 +265,8 @@ public class TestRMStateStore {
return container.getId();
}
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
@ -271,7 +321,8 @@ public class TestRMStateStore {
RMApp mockRemovedApp = mock(RMApp.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
new HashMap<ApplicationAttemptId, RMAppAttempt>();
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved);
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
@ -288,7 +339,8 @@ public class TestRMStateStore {
// load state
store = stateStoreHelper.getRMStateStore();
RMState state = store.loadState();
Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState();
Map<ApplicationId, ApplicationState> rmAppState =
state.getApplicationState();
ApplicationState appState = rmAppState.get(appId1);
// app is loaded
@ -362,7 +414,8 @@ public class TestRMStateStore {
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
}
private Token<AMRMTokenIdentifier> generateAMRMToken(

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestZKRMStateStoreZKClientConnections extends
ClientBaseWithFixes {
private static final int ZK_OP_WAIT_TIME = 3000;
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
class TestZKClient {
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
return createClient(watcher, hostPort, 100);
}
@Override
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
if (forExpire) {
// a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null);
super.processWatchEvent(expriredEvent);
forExpire = false;
syncBarrier.await();
} else {
super.processWatchEvent(event);
}
}
}
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) {
super.process(event);
try {
if (store != null) {
store.processWatchEvent(event);
}
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
}
}
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
}
@Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect()
throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
stopServer();
zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
try {
store.getDataWithRetries(path, true);
fail("Expected ZKClient time out exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Wait for ZKClient creation timed out"));
}
// ZKRMStateStore Session restored
startServer();
zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
byte[] ret = null;
try {
ret = store.getDataWithRetries(path, true);
} catch (Exception e) {
String error = "ZKRMStateStore Session restore failed";
LOG.error(error, e);
fail(error);
}
Assert.assertEquals("newBytes", new String(ret));
}
@Test(timeout = 20000)
public void testZKSessionTimeout() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// a hack to trigger expired event
zkClientTester.forExpire = true;
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "bytes".getBytes(), 0);
zkClientTester.syncBarrier.await();
// after this point, expired event has already been processed.
try {
byte[] ret = store.getDataWithRetries(path, false);
Assert.assertEquals("bytes", new String(ret));
} catch (Exception e) {
String error = "New session creation failed";
LOG.error(error, e);
fail(error);
}
}
@Test (timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca");
try {
zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */}
}
@Test (timeout = 20000)
public void testInvalidZKAclConfiguration() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*");
try {
zkClientTester.getRMStateStore(conf);
fail("ZKRMStateStore created with bad ACL");
} catch (ZKUtil.BadAclFormatException bafe) {
// expected behavior
} catch (Exception e) {
String error = "Incorrect exception on BadAclFormat";
LOG.error(error, e);
fail(error);
}
}
}