YARN-895. Changed RM state-store to not crash immediately if RM restarts while the state-store is down. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547538 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-03 19:28:02 +00:00
parent 7af12ab207
commit 9c95015bb4
8 changed files with 148 additions and 16 deletions

View File

@ -191,6 +191,9 @@ Release 2.4.0 - UNRELEASED
YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
tests. (Jian He via vinodkv) tests. (Jian He via vinodkv)
YARN-895. Changed RM state-store to not crash immediately if RM restarts while
the state-store is down. (Jian He via vinodkv)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -305,4 +305,9 @@
<Bug pattern="NM_CLASS_NOT_EXCEPTION" /> <Bug pattern="NM_CLASS_NOT_EXCEPTION" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -301,22 +301,30 @@ public class YarnConfiguration extends Configuration {
public static final String RM_STORE = RM_PREFIX + "store.class"; public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */ /** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI = public static final String FS_RM_STATE_STORE_URI = RM_PREFIX
RM_PREFIX + "fs.state-store.uri"; + "fs.state-store.uri";
public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX
+ "fs.state-store.retry-policy-spec";
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
/** /**
* Comma separated host:port pairs, each corresponding to a ZK server for * Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore * ZKRMStateStore
*/ */
public static final String ZK_STATE_STORE_PREFIX = public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk.state-store."; RM_PREFIX + "zk-state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES = public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries"; ZK_STATE_STORE_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3; public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500;
/** retry interval when connecting to zookeeper*/
public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS =
ZK_STATE_STORE_PREFIX + "retry-interval-ms";
public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000;
public static final String ZK_RM_STATE_STORE_ADDRESS = public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address"; ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */ /** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS = public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
ZK_STATE_STORE_PREFIX + "timeout.ms"; ZK_STATE_STORE_PREFIX + "timeout-ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000; public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */ /** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH = public static final String ZK_RM_STATE_STORE_PARENT_PATH =

View File

@ -283,8 +283,8 @@
is implicitly fenced, meaning a single ResourceManager is is implicitly fenced, meaning a single ResourceManager is
able to use the store at any point in time. More details on this, along able to use the store at any point in time. More details on this, along
with setting up appropriate ACLs is discussed under the description for with setting up appropriate ACLs is discussed under the description for
yarn.resourcemanager.zk.state-store.root-node.acl.</description> yarn.resourcemanager.zk-state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk.state-store.address</name> <name>yarn.resourcemanager.zk-state-store.address</name>
<!--value>127.0.0.1:2181</value--> <!--value>127.0.0.1:2181</value-->
</property> </property>
@ -293,8 +293,15 @@
ZooKeeper. This may be supplied when using ZooKeeper. This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description> as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.num-retries</name> <name>yarn.resourcemanager.zk-state-store.num-retries</name>
<value>3</value> <value>500</value>
</property>
<property>
<description>Retry interval in milliseconds when ZKRMStateStore tries to
connect to ZooKeeper.</description>
<name>yarn.resourcemanager.zk-state-store.retry-interval-ms</name>
<value>2000</value>
</property> </property>
<property> <property>
@ -302,16 +309,20 @@
stored. This must be supplied when using stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description> as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.parent-path</name> <name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value> <value>/rmstore</value>
</property> </property>
<property> <property>
<description>Timeout when connecting to ZooKeeper. <description>ZooKeeper session timeout in milliseconds. Session expiration
is managed by the ZooKeeper cluster itself, not by the client. This value is
used by the cluster to determine when the client's session expires.
Expirations happens when the cluster does not hear from the client within
the specified session timeout period (i.e. no heartbeat).
This may be supplied when using This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description> as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.timeout.ms</name> <name>yarn.resourcemanager.zk-state-store.timeout-ms</name>
<value>60000</value> <value>60000</value>
</property> </property>
@ -320,7 +331,7 @@
This may be supplied when using This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description> as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk.state-store.acl</name> <name>yarn.resourcemanager.zk-state-store.acl</name>
<value>world:anyone:rwcda</value> <value>world:anyone:rwcda</value>
</property> </property>
@ -336,7 +347,7 @@
permissions. permissions.
By default, when this property is not set, we use the ACLs from By default, when this property is not set, we use the ACLs from
yarn.resourcemanager.zk.state-store.acl for shared admin access and yarn.resourcemanager.zk-state-store.acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete rm-address:cluster-timestamp for username-based exclusive create-delete
access. access.
@ -346,7 +357,7 @@
ResourceManagers have shared admin access and the Active ResourceManger ResourceManagers have shared admin access and the Active ResourceManger
takes over (exclusively) the create-delete access. takes over (exclusively) the create-delete access.
</description> </description>
<name>yarn.resourcemanager.zk.state-store.root-node.acl</name> <name>yarn.resourcemanager.zk-state-store.root-node.acl</name>
</property> </property>
<property> <property>
@ -359,6 +370,16 @@
<!--value>hdfs://localhost:9000/rmstore</value--> <!--value>hdfs://localhost:9000/rmstore</value-->
</property> </property>
<property>
<description>hdfs client retry policy specification. hdfs client retry
is always enabled. Specified in pairs of sleep-time and number-of-retries
and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on
average, the following n1 retries sleep t1 milliseconds on average, and so on.
</description>
<name>yarn.resourcemanager.fs.state-store.retry-policy-spec</name>
<value>2000, 500</value>
</property>
<property> <property>
<description>Enable RM high-availability. When enabled, <description>Enable RM high-availability. When enabled,
(1) The RM starts in the Standby mode by default, and transitions to (1) The RM starts in the Standby mode by default, and transitions to

View File

@ -94,7 +94,14 @@ public class FileSystemRMStateStore extends RMStateStore {
// create filesystem only now, as part of service-start. By this time, RM is // create filesystem only now, as part of service-start. By this time, RM is
// authenticated with kerberos so we are good to create a file-system // authenticated with kerberos so we are good to create a file-system
// handle. // handle.
fs = fsWorkingPath.getFileSystem(getConfig()); Configuration conf = new Configuration(getConfig());
conf.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC);
conf.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot); fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot); fs.mkdirs(rmAppRoot);
} }

View File

@ -82,6 +82,7 @@ public class ZKRMStateStore extends RMStateStore {
private String zkHostPort = null; private String zkHostPort = null;
private int zkSessionTimeout; private int zkSessionTimeout;
private long zkRetryInterval;
private List<ACL> zkAcl; private List<ACL> zkAcl;
private String zkRootNodePath; private String zkRootNodePath;
private String rmDTSecretManagerRoot; private String rmDTSecretManagerRoot;
@ -161,6 +162,9 @@ public class ZKRMStateStore extends RMStateStore {
zkSessionTimeout = zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS); YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
zkRetryInterval =
conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS);
// Parse authentication from configuration. // Parse authentication from configuration.
String zkAclConf = String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL, conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
@ -810,6 +814,9 @@ public class ZKRMStateStore extends RMStateStore {
} }
} catch (KeeperException ke) { } catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) { if (shouldRetry(ke.code()) && ++retry < numRetries) {
LOG.info("Waiting for zookeeper to be connected, retry no. + "
+ retry);
Thread.sleep(zkRetryInterval);
continue; continue;
} }
throw ke; throw ke;

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -33,7 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test; import org.junit.Test;
@ -81,6 +86,8 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString()); workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000");
this.store = new TestFileSystemRMStore(conf); this.store = new TestFileSystemRMStore(conf);
return store; return store;
} }
@ -139,4 +146,46 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test (timeout = 30000)
public void testFSRMStateStoreClientRetry() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
final RMStateStore store = fsTester.getRMStateStore();
store.setRMDispatcher(new TestDispatcher());
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
cluster.shutdownNameNodes();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
store.storeApplicationStateInternal("application1",
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
.newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix
// that separately.
if (!e.getMessage().contains("could only be replicated" +
" to 0 nodes instead of minReplication (=1)")) {
assertionFailedInThread.set(true);
}
e.printStackTrace();
}
}
};
Thread.sleep(2000);
clientThread.start();
cluster.restartNameNode();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
} finally {
cluster.shutdown();
}
}
} }

View File

@ -37,6 +37,7 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -114,6 +115,37 @@ public class TestZKRMStateStoreZKClientConnections extends
} }
} }
@Test (timeout = 20000)
public void testZKClientRetry() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
final String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
stopServer();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
store.getDataWithRetries(path, true);
} catch (Exception e) {
e.printStackTrace();
assertionFailedInThread.set(true);
}
}
};
Thread.sleep(2000);
startServer();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
}
@Test(timeout = 20000) @Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect() public void testZKClientDisconnectAndReconnect()
throws Exception { throws Exception {