YARN-1029. Added embedded leader election in the ResourceManager. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1556103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-07 01:56:11 +00:00
parent 5241aa4cdd
commit c3cc855d27
22 changed files with 779 additions and 309 deletions

View File

@ -767,7 +767,8 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
LOG.debug("Created new connection for " + this); LOG.debug("Created new connection for " + this);
} }
void terminateConnection() { @InterfaceAudience.Private
public void terminateConnection() {
if (zkClient == null) { if (zkClient == null) {
return; return;
} }

View File

@ -55,6 +55,9 @@ Release 2.4.0 - UNRELEASED
YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help
with RM failover. (Karthik Kambatla via vinodkv) with RM failover. (Karthik Kambatla via vinodkv)
YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
Kambatla via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -51,6 +51,22 @@ public class HAUtil {
YarnConfiguration.DEFAULT_RM_HA_ENABLED); YarnConfiguration.DEFAULT_RM_HA_ENABLED);
} }
public static boolean isAutomaticFailoverEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ENABLED);
}
public static boolean isAutomaticFailoverEnabledAndEmbedded(
Configuration conf) {
return isAutomaticFailoverEnabled(conf) &&
isAutomaticFailoverEmbedded(conf);
}
public static boolean isAutomaticFailoverEmbedded(Configuration conf) {
return conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_EMBEDDED);
}
/** /**
* Verify configuration for Resource Manager HA. * Verify configuration for Resource Manager HA.
* @param conf Configuration * @param conf Configuration
@ -162,8 +178,7 @@ public class HAUtil {
* @param conf Configuration. Please use verifyAndSetRMHAId to check. * @param conf Configuration. Please use verifyAndSetRMHAId to check.
* @return RM Id on success * @return RM Id on success
*/ */
@VisibleForTesting public static String getRMHAId(Configuration conf) {
static String getRMHAId(Configuration conf) {
return conf.get(YarnConfiguration.RM_HA_ID); return conf.get(YarnConfiguration.RM_HA_ID);
} }

View File

@ -87,6 +87,8 @@ public class YarnConfiguration extends Configuration {
//////////////////////////////// ////////////////////////////////
public static final String RM_PREFIX = "yarn.resourcemanager."; public static final String RM_PREFIX = "yarn.resourcemanager.";
public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
/** The address of the applications manager interface in the RM.*/ /** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS = public static final String RM_ADDRESS =
RM_PREFIX + "address"; RM_PREFIX + "address";
@ -278,6 +280,36 @@ public class YarnConfiguration extends Configuration {
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
public static final String RM_ZK_ADDRESS = RM_ZK_PREFIX + "address";
public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_NUM_RETRIES = 500;
public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms";
public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 2000;
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk-state-store.";
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
ZK_STATE_STORE_PREFIX + "parent-path";
public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
/** Root node ACLs for fencing */
public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
ZK_STATE_STORE_PREFIX + "root-node.acl";
/** HA related configs */ /** HA related configs */
public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_PREFIX = RM_PREFIX + "ha.";
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
@ -296,6 +328,22 @@ public class YarnConfiguration extends Configuration {
HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS
: RM_WEBAPP_ADDRESS)); : RM_WEBAPP_ADDRESS));
public static final String AUTO_FAILOVER_PREFIX =
RM_HA_PREFIX + "automatic-failover.";
public static final String AUTO_FAILOVER_ENABLED =
AUTO_FAILOVER_PREFIX + "enabled";
public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = false;
public static final String AUTO_FAILOVER_EMBEDDED =
AUTO_FAILOVER_PREFIX + "embedded";
public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = false;
public static final String AUTO_FAILOVER_ZK_BASE_PATH =
AUTO_FAILOVER_PREFIX + "zk-base-path";
public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH =
"/yarn-leader-election";
public static final String CLIENT_FAILOVER_PREFIX = public static final String CLIENT_FAILOVER_PREFIX =
YARN_PREFIX + "client.failover-"; YARN_PREFIX + "client.failover-";
public static final String CLIENT_FAILOVER_PROXY_PROVIDER = public static final String CLIENT_FAILOVER_PROXY_PROVIDER =
@ -334,36 +382,6 @@ public class YarnConfiguration extends Configuration {
+ "fs.state-store.retry-policy-spec"; + "fs.state-store.retry-policy-spec";
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500"; "2000, 500";
/**
* Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore
*/
public static final String ZK_STATE_STORE_PREFIX =
RM_PREFIX + "zk-state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500;
/** retry interval when connecting to zookeeper*/
public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS =
ZK_STATE_STORE_PREFIX + "retry-interval-ms";
public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000;
public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
ZK_STATE_STORE_PREFIX + "timeout-ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
ZK_STATE_STORE_PREFIX + "parent-path";
public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
/** ACL for znodes in ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_ACL =
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
ZK_STATE_STORE_PREFIX + "root-node.acl";
/** The maximum number of completed applications RM keeps. */ /** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS = public static final String RM_MAX_COMPLETED_APPLICATIONS =

View File

@ -133,3 +133,11 @@ message RMStateVersionProto {
optional int32 major_version = 1; optional int32 major_version = 1;
optional int32 minor_version = 2; optional int32 minor_version = 2;
} }
//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////
//////////////////////////////////////////////////////////////////
message ActiveRMInfoProto {
required string clusterid = 1;
required string rmId = 2;
}

View File

@ -30,6 +30,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId> <artifactId>hadoop-yarn-api</artifactId>

View File

@ -28,37 +28,39 @@ import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.junit.AfterClass; import org.junit.After;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestRMFailover { public class TestRMFailover extends ClientBaseWithFixes {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestRMFailover.class.getName()); LogFactory.getLog(TestRMFailover.class.getName());
private static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
private static final String RM1_NODE_ID = "rm1"; private static final String RM1_NODE_ID = "rm1";
private static final int RM1_PORT_BASE = 10000; private static final int RM1_PORT_BASE = 10000;
private static final String RM2_NODE_ID = "rm2"; private static final String RM2_NODE_ID = "rm2";
private static final int RM2_PORT_BASE = 20000; private static final int RM2_PORT_BASE = 20000;
private static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
private static Configuration conf; private Configuration conf;
private static MiniYARNCluster cluster; private MiniYARNCluster cluster;
private static void setConfForRM(String rmId, String prefix, String value) { private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value); conf.set(HAUtil.addSuffix(prefix, rmId), value);
} }
private static void setRpcAddressForRM(String rmId, int base) { private void setRpcAddressForRM(String rmId, int base) {
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_PORT)); (base + YarnConfiguration.DEFAULT_RM_PORT));
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
@ -73,13 +75,8 @@ public class TestRMFailover {
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
} }
private static AdminService getRMAdminService(int index) { @Before
return public void setup() throws IOException {
cluster.getResourceManager(index).getRMContext().getRMAdminService();
}
@BeforeClass
public static void setup() throws IOException {
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@ -87,27 +84,22 @@ public class TestRMFailover {
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1);
cluster.init(conf);
cluster.start();
cluster.getResourceManager(0).getRMContext().getRMAdminService()
.transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
} }
@AfterClass @After
public static void teardown() { public void teardown() {
cluster.stop(); cluster.stop();
} }
private void verifyClientConnection() { private void verifyClientConnection() {
int numRetries = 3; int numRetries = 3;
while(numRetries-- > 0) { while(numRetries-- > 0) {
Configuration conf = new YarnConfiguration(TestRMFailover.conf); Configuration conf = new YarnConfiguration(this.conf);
YarnClient client = YarnClient.createYarnClient(); YarnClient client = YarnClient.createYarnClient();
client.init(conf); client.init(conf);
client.start(); client.start();
@ -123,31 +115,68 @@ public class TestRMFailover {
fail("Client couldn't connect to the Active RM"); fail("Client couldn't connect to the Active RM");
} }
private void verifyConnections() throws InterruptedException, YarnException {
assertTrue("NMs failed to connect to the RM",
cluster.waitForNodeManagersToConnect(20000));
verifyClientConnection();
}
private AdminService getAdminService(int index) {
return cluster.getResourceManager(index).getRMContext().getRMAdminService();
}
private void explicitFailover() throws IOException {
int activeRMIndex = cluster.getActiveRMIndex();
int newActiveRMIndex = (activeRMIndex + 1) % 2;
getAdminService(activeRMIndex).transitionToStandby(req);
getAdminService(newActiveRMIndex).transitionToActive(req);
assertEquals("Failover failed", newActiveRMIndex, cluster.getActiveRMIndex());
}
private void failover()
throws IOException, InterruptedException, YarnException {
int activeRMIndex = cluster.getActiveRMIndex();
cluster.stopResourceManager(activeRMIndex);
assertEquals("Failover failed",
(activeRMIndex + 1) % 2, cluster.getActiveRMIndex());
cluster.restartResourceManager(activeRMIndex);
}
@Test @Test
public void testExplicitFailover() public void testExplicitFailover()
throws YarnException, InterruptedException, IOException { throws YarnException, InterruptedException, IOException {
assertTrue("NMs failed to connect to the RM", conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.waitForNodeManagersToConnect(5000)); cluster.init(conf);
verifyClientConnection(); cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
// Failover to the second RM explicitFailover();
getRMAdminService(0).transitionToStandby(req); verifyConnections();
getRMAdminService(1).transitionToActive(req);
assertEquals("Wrong ResourceManager is active",
HAServiceProtocol.HAServiceState.ACTIVE,
getRMAdminService(1).getServiceStatus().getState());
assertTrue("NMs failed to connect to the RM",
cluster.waitForNodeManagersToConnect(5000));
verifyClientConnection();
// Failover back to the first RM explicitFailover();
getRMAdminService(1).transitionToStandby(req); verifyConnections();
getRMAdminService(0).transitionToActive(req); }
assertEquals("Wrong ResourceManager is active",
HAServiceProtocol.HAServiceState.ACTIVE, @Test
getRMAdminService(0).getServiceStatus().getState()); public void testAutomaticFailover()
assertTrue("NMs failed to connect to the RM", throws YarnException, InterruptedException, IOException {
cluster.waitForNodeManagersToConnect(5000)); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
verifyClientConnection(); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
failover();
verifyConnections();
failover();
verifyConnections();
} }
} }

View File

@ -21,16 +21,19 @@ package org.apache.hadoop.yarn.client;
import org.apache.hadoop.ha.BadFencingConfigurationException; import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer; import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
public class RMHAServiceTarget extends HAServiceTarget { public class RMHAServiceTarget extends HAServiceTarget {
private InetSocketAddress haAdminServiceAddress; private final boolean autoFailoverEnabled;
private final InetSocketAddress haAdminServiceAddress;
public RMHAServiceTarget(YarnConfiguration conf) public RMHAServiceTarget(YarnConfiguration conf)
throws IOException { throws IOException {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
haAdminServiceAddress = conf.getSocketAddr( haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -44,19 +47,23 @@ public class RMHAServiceTarget extends HAServiceTarget {
@Override @Override
public InetSocketAddress getZKFCAddress() { public InetSocketAddress getZKFCAddress() {
// TODO (YARN-1177): Hook up ZKFC information // TODO (YARN-1177): ZKFC implementation
return null; throw new UnsupportedOperationException("RMHAServiceTarget doesn't have " +
"a corresponding ZKFC address");
} }
@Override @Override
public NodeFencer getFencer() { public NodeFencer getFencer() {
// TODO (YARN-1026): Hook up fencing implementation
return null; return null;
} }
@Override @Override
public void checkFencingConfigured() public void checkFencingConfigured() throws BadFencingConfigurationException {
throws BadFencingConfigurationException { throw new BadFencingConfigurationException("Fencer not configured");
// TODO (YARN-1026): Based on fencing implementation }
@Override
public boolean isAutoFailoverEnabled() {
return autoFailoverEnabled;
} }
} }

View File

@ -264,13 +264,20 @@
<property> <property>
<description>Enable RM to recover state after starting. If true, then <description>Enable RM to recover state after starting. If true, then
yarn.resourcemanager.store.class must be specified</description> yarn.resourcemanager.store.class must be specified. </description>
<name>yarn.resourcemanager.recovery.enabled</name> <name>yarn.resourcemanager.recovery.enabled</name>
<value>false</value> <value>false</value>
</property> </property>
<property> <property>
<description>The class to use as the persistent store.</description> <description>The class to use as the persistent store.
If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
is used, the store is implicitly fenced; meaning a single ResourceManager
is able to use the store at any point in time. More details on this
implicit fencing, along with setting up appropriate ACLs is discussed
under yarn.resourcemanager.zk-state-store.root-node.acl.
</description>
<name>yarn.resourcemanager.store.class</name> <name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
</property> </property>
@ -291,31 +298,24 @@
</property> </property>
<property> <property>
<description>Host:Port of the ZooKeeper server where RM state will <description>Host:Port of the ZooKeeper server to be used by the RM. This
be stored. This must be supplied when using must be supplied when using the ZooKeeper based implementation of the
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore RM state store and/or embedded automatic failover in a HA setting.
as the value for yarn.resourcemanager.store.class. ZKRMStateStore </description>
is implicitly fenced, meaning a single ResourceManager is <name>yarn.resourcemanager.zk-address</name>
able to use the store at any point in time. More details on this, along
with setting up appropriate ACLs is discussed under the description for
yarn.resourcemanager.zk-state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk-state-store.address</name>
<!--value>127.0.0.1:2181</value--> <!--value>127.0.0.1:2181</value-->
</property> </property>
<property> <property>
<description>Number of times ZKRMStateStore tries to connect to <description>Number of times RM tries to connect to ZooKeeper.</description>
ZooKeeper. This may be supplied when using <name>yarn.resourcemanager.zk-num-retries</name>
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk-state-store.num-retries</name>
<value>500</value> <value>500</value>
</property> </property>
<property> <property>
<description>Retry interval in milliseconds when ZKRMStateStore tries to <description>Retry interval in milliseconds when connecting to ZooKeeper.
connect to ZooKeeper.</description> </description>
<name>yarn.resourcemanager.zk-state-store.retry-interval-ms</name> <name>yarn.resourcemanager.zk-retry-interval-ms</name>
<value>2000</value> <value>2000</value>
</property> </property>
@ -333,20 +333,14 @@
is managed by the ZooKeeper cluster itself, not by the client. This value is is managed by the ZooKeeper cluster itself, not by the client. This value is
used by the cluster to determine when the client's session expires. used by the cluster to determine when the client's session expires.
Expirations happens when the cluster does not hear from the client within Expirations happens when the cluster does not hear from the client within
the specified session timeout period (i.e. no heartbeat). the specified session timeout period (i.e. no heartbeat).</description>
This may be supplied when using <name>yarn.resourcemanager.zk-timeout-ms</name>
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk-state-store.timeout-ms</name>
<value>60000</value> <value>60000</value>
</property> </property>
<property> <property>
<description>ACL's to be used for ZooKeeper znodes. <description>ACL's to be used for ZooKeeper znodes.</description>
This may be supplied when using <name>yarn.resourcemanager.zk-acl</name>
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
<name>yarn.resourcemanager.zk-state-store.acl</name>
<value>world:anyone:rwcda</value> <value>world:anyone:rwcda</value>
</property> </property>
@ -362,7 +356,7 @@
permissions. permissions.
By default, when this property is not set, we use the ACLs from By default, when this property is not set, we use the ACLs from
yarn.resourcemanager.zk-state-store.acl for shared admin access and yarn.resourcemanager.zk-acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete rm-address:cluster-timestamp for username-based exclusive create-delete
access. access.
@ -408,6 +402,36 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>Enable automatic failover.</description>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>false</value>
</property>
<property>
<description>Enable embedded automatic failover. The embedded elector
relies on the RM state store to handle fencing, and is primarily intended
to be used in conjunction with ZKRMStateStore.</description>
<name>yarn.resourcemanager.ha.automatic-failover.embedded</name>
<value>false</value>
</property>
<property>
<description>The base znode path to use for storing leader information,
when using ZooKeeper based leader election.</description>
<name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>
<value>/yarn-leader-election</value>
</property>
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election fo this cluster and ensures it does not affect
other clusters</description>
<name>yarn.resourcemanager.cluster-id</name>
<!--value>yarn-cluster</value-->
</property>
<property> <property>
<description>The list of RM nodes in the cluster when HA is <description>The list of RM nodes in the cluster when HA is
enabled. See description of yarn.resourcemanager.ha enabled. See description of yarn.resourcemanager.ha

View File

@ -20,11 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -41,14 +38,16 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -75,7 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicy
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
public class AdminService extends AbstractService implements public class AdminService extends CompositeService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol { HAServiceProtocol, ResourceManagerAdministrationProtocol {
private static final Log LOG = LogFactory.getLog(AdminService.class); private static final Log LOG = LogFactory.getLog(AdminService.class);
@ -84,6 +83,8 @@ public class AdminService extends AbstractService implements
private final ResourceManager rm; private final ResourceManager rm;
private String rmId; private String rmId;
private boolean autoFailoverEnabled;
private Server server; private Server server;
private InetSocketAddress masterServiceAddress; private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl; private AccessControlList adminAcl;
@ -99,6 +100,15 @@ public class AdminService extends AbstractService implements
@Override @Override
public synchronized void serviceInit(Configuration conf) throws Exception { public synchronized void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
addIfService(createEmbeddedElectorService());
}
}
}
masterServiceAddress = conf.getSocketAddr( masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -162,6 +172,10 @@ public class AdminService extends AbstractService implements
} }
} }
protected EmbeddedElectorService createEmbeddedElectorService() {
return new EmbeddedElectorService(rmContext);
}
private UserGroupInformation checkAccess(String method) throws IOException { private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG); return RMServerUtils.verifyAccess(adminAcl, method, LOG);
} }
@ -174,6 +188,43 @@ public class AdminService extends AbstractService implements
} }
} }
/**
* Check that a request to change this node's HA state is valid.
* In particular, verifies that, if auto failover is enabled, non-forced
* requests from the HAAdmin CLI are rejected, and vice versa.
*
* @param req the request to check
* @throws AccessControlException if the request is disallowed
*/
private void checkHaStateChange(StateChangeRequestInfo req)
throws AccessControlException {
switch (req.getSource()) {
case REQUEST_BY_USER:
if (autoFailoverEnabled) {
throw new AccessControlException(
"Manual failover for this ResourceManager is disallowed, " +
"because automatic failover is enabled.");
}
break;
case REQUEST_BY_USER_FORCED:
if (autoFailoverEnabled) {
LOG.warn("Allowing manual failover from " +
org.apache.hadoop.ipc.Server.getRemoteAddress() +
" even though automatic failover is enabled, because the user " +
"specified the force flag");
}
break;
case REQUEST_BY_ZKFC:
if (!autoFailoverEnabled) {
throw new AccessControlException(
"Request from ZK failover controller at " +
org.apache.hadoop.ipc.Server.getRemoteAddress() + " denied " +
"since automatic failover is not enabled");
}
break;
}
}
private synchronized boolean isRMActive() { private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == rmContext.getHAServiceState(); return HAServiceState.ACTIVE == rmContext.getHAServiceState();
} }
@ -196,8 +247,7 @@ public class AdminService extends AbstractService implements
public synchronized void transitionToActive( public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToActive"); UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled, checkHaStateChange(reqInfo);
// check if transition should be allowed for this request
try { try {
rm.transitionToActive(); rm.transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(), RMAuditLogger.logSuccess(user.getShortUserName(),
@ -215,8 +265,7 @@ public class AdminService extends AbstractService implements
public synchronized void transitionToStandby( public synchronized void transitionToStandby(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby"); UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled, checkHaStateChange(reqInfo);
// check if transition should be allowed for this request
try { try {
rm.transitionToStandby(true); rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(), RMAuditLogger.logSuccess(user.getShortUserName(),

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class EmbeddedElectorService extends AbstractService
implements ActiveStandbyElector.ActiveStandbyElectorCallback {
private static final Log LOG =
LogFactory.getLog(EmbeddedElectorService.class.getName());
private static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
private RMContext rmContext;
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
EmbeddedElectorService(RMContext rmContext) {
super(EmbeddedElectorService.class.getName());
this.rmContext = rmContext;
}
@Override
protected synchronized void serviceInit(Configuration conf)
throws Exception {
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkQuorum == null) {
throw new YarnRuntimeException("Embedded automatic failover " +
"is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
" is not set");
}
String rmId = HAUtil.getRMHAId(conf);
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID);
if (clusterId == null) {
throw new YarnRuntimeException(YarnConfiguration.RM_CLUSTER_ID +
" is not specified!");
}
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_RM_ZK_ACL);
List<ACL> zkAcls;
try {
zkAcls = ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
} catch (ZKUtil.BadAclFormatException bafe) {
throw new YarnRuntimeException(
YarnConfiguration.RM_ZK_ACL + "has ill-formatted ACLs");
}
// TODO (YARN-1528): ZKAuthInfo to be set for rm-store and elector
List<ZKUtil.ZKAuthInfo> zkAuths = Collections.emptyList();
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this);
elector.ensureParentZNode();
if (!isParentZnodeSafe(clusterId)) {
notifyFatalError(electionZNode + " znode has invalid data! "+
"Might need formatting!");
}
super.serviceInit(conf);
}
@Override
protected synchronized void serviceStart() throws Exception {
elector.joinElection(localActiveNodeInfo);
super.serviceStart();
}
@Override
protected synchronized void serviceStop() throws Exception {
elector.quitElection(false);
elector.terminateConnection();
super.serviceStop();
}
@Override
public synchronized void becomeActive() throws ServiceFailedException {
try {
rmContext.getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
}
@Override
public synchronized void becomeStandby() {
try {
rmContext.getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
}
@Override
public void enterNeutralMode() {
/**
* Possibly due to transient connection issues. Do nothing.
* TODO: Might want to keep track of how long in this state and transition
* to standby.
*/
}
@SuppressWarnings(value = "unchecked")
@Override
public synchronized void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
}
@Override
public synchronized void fenceOldActive(byte[] oldActiveData) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request to fence old active being ignored, " +
"as embedded leader election doesn't support fencing");
}
}
private static byte[] createActiveNodeInfo(String clusterId, String rmId)
throws IOException {
return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
.newBuilder()
.setClusterid(clusterId)
.setRmId(rmId)
.build()
.toByteArray();
}
private synchronized boolean isParentZnodeSafe(String clusterId)
throws InterruptedException, IOException, KeeperException {
byte[] data;
try {
data = elector.getActiveData();
} catch (ActiveStandbyElector.ActiveNotFoundException e) {
// no active found, parent znode is safe
return true;
}
YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
try {
proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
return false;
}
// Check if the passed proto corresponds to an RM in the same cluster
if (!proto.getClusterid().equals(clusterId)) {
LOG.error("Mismatched cluster! The other RM seems " +
"to be from a different cluster. Current cluster = " + clusterId +
"Other RM's cluster = " + proto.getClusterid());
return false;
}
return true;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMFatalEvent extends AbstractEvent<RMFatalEventType> {
private String cause;
public RMFatalEvent(RMFatalEventType rmFatalEventType, String cause) {
super(rmFatalEventType);
this.cause = cause;
}
public RMFatalEvent(RMFatalEventType rmFatalEventType, Exception cause) {
super(rmFatalEventType);
this.cause = StringUtils.stringifyException(cause);
}
public String getCause() {return this.cause;}
}

View File

@ -15,9 +15,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public enum RMStateStoreOperationFailedEventType { package org.apache.hadoop.yarn.server.resourcemanager;
FENCED, // Store operation failed because it was fenced
FAILED // Store operation failed for no known reason import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public enum RMFatalEventType {
// Source <- Store
STATE_STORE_FENCED,
STATE_STORE_OP_FAILED,
// Source <- Embedded Elector
EMBEDDED_ELECTOR_FAILED
} }

View File

@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -121,6 +119,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
*/ */
@VisibleForTesting @VisibleForTesting
protected RMContextImpl rmContext; protected RMContextImpl rmContext;
private Dispatcher rmDispatcher;
@VisibleForTesting @VisibleForTesting
protected AdminService adminService; protected AdminService adminService;
@ -134,7 +133,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
*/ */
protected RMActiveServices activeServices; protected RMActiveServices activeServices;
protected RMSecretManagerService rmSecretManagerService; protected RMSecretManagerService rmSecretManagerService;
private Dispatcher rmDispatcher;
protected ResourceScheduler scheduler; protected ResourceScheduler scheduler;
private ClientRMService clientRM; private ClientRMService clientRM;
@ -179,6 +177,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.conf = conf; this.conf = conf;
this.rmContext = new RMContextImpl(); this.rmContext = new RMContextImpl();
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
rmDispatcher.register(RMFatalEventType.class,
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
adminService = createAdminService(); adminService = createAdminService();
addService(adminService); addService(adminService);
rmContext.setRMAdminService(adminService); rmContext.setRMAdminService(adminService);
@ -207,11 +212,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new SchedulerEventDispatcher(this.scheduler); return new SchedulerEventDispatcher(this.scheduler);
} }
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
}
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
return new AsyncDispatcher(); return new AsyncDispatcher();
} }
@ -297,10 +297,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration configuration) throws Exception { protected void serviceInit(Configuration configuration) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
rmSecretManagerService = createRMSecretManagerService(); rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService); addService(rmSecretManagerService);
@ -332,8 +328,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
try { try {
rmStore.init(conf); rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher); rmStore.setRMDispatcher(rmDispatcher);
rmDispatcher.register(RMStateStoreOperationFailedEventType.class,
createRMStateStoreOperationFailedEventDispatcher());
} catch (Exception e) { } catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for // the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced // HA and we need to give up master status if we got fenced
@ -605,26 +599,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
@Private @Private
public static class RMStateStoreOperationFailedEventDispatcher implements public static class RMFatalEventDispatcher
EventHandler<RMStateStoreOperationFailedEvent> { implements EventHandler<RMFatalEvent> {
private final RMContext rmContext; private final RMContext rmContext;
private final ResourceManager rm; private final ResourceManager rm;
public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext, public RMFatalEventDispatcher(
ResourceManager resourceManager) { RMContext rmContext, ResourceManager resourceManager) {
this.rmContext = rmContext; this.rmContext = rmContext;
this.rm = resourceManager; this.rm = resourceManager;
} }
@Override @Override
public void handle(RMStateStoreOperationFailedEvent event) { public void handle(RMFatalEvent event) {
if (LOG.isDebugEnabled()) { LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
LOG.debug("Received a " +
RMStateStoreOperationFailedEvent.class.getName() + " of type " +
event.getType().name()); event.getType().name());
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) { if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) {
LOG.info("RMStateStore has been fenced"); LOG.info("RMStateStore has been fenced");
if (rmContext.isHAEnabled()) { if (rmContext.isHAEnabled()) {
try { try {
@ -633,14 +624,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
rm.transitionToStandby(true); rm.transitionToStandby(true);
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode."); LOG.fatal("Failed to transition RM to Standby mode.");
} }
} }
} }
LOG.error("Shutting down RM on receiving a " +
RMStateStoreOperationFailedEvent.class.getName() + " of type " +
event.getType().name());
ExitUtil.terminate(1, event.getCause()); ExitUtil.terminate(1, event.getCause());
} }
} }

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -679,15 +681,13 @@ public abstract class RMStateStore extends AbstractService {
* @param failureCause the exception due to which the operation failed * @param failureCause the exception due to which the operation failed
*/ */
private void notifyStoreOperationFailed(Exception failureCause) { private void notifyStoreOperationFailed(Exception failureCause) {
RMStateStoreOperationFailedEventType type; RMFatalEventType type;
if (failureCause instanceof StoreFencedException) { if (failureCause instanceof StoreFencedException) {
type = RMStateStoreOperationFailedEventType.FENCED; type = RMFatalEventType.STATE_STORE_FENCED;
} else { } else {
type = RMStateStoreOperationFailedEventType.FAILED; type = RMFatalEventType.STATE_STORE_OP_FAILED;
} }
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
rmDispatcher.getEventHandler().handle(
new RMStateStoreOperationFailedEvent(type, failureCause));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMStateStoreOperationFailedEvent
extends AbstractEvent<RMStateStoreOperationFailedEventType> {
private Exception cause;
RMStateStoreOperationFailedEvent(
RMStateStoreOperationFailedEventType type, Exception cause) {
super(type);
this.cause = cause;
}
public Exception getCause() {
return this.cause;
}
}

View File

@ -182,34 +182,34 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void initInternal(Configuration conf) throws Exception { public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS); zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) { if (zkHostPort == null) {
throw new YarnRuntimeException("No server address specified for " + throw new YarnRuntimeException("No server address specified for " +
"zookeeper state store for Resource Manager recovery. " + "zookeeper state store for Resource Manager recovery. " +
YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured."); YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
} }
numRetries = numRetries =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES, conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES); YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
znodeWorkingPath = znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkSessionTimeout = zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS); YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkRetryInterval = zkRetryInterval =
conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
// Parse authentication from configuration. // Parse authentication from configuration.
String zkAclConf = String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL, conf.get(YarnConfiguration.RM_ZK_ACL,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL); YarnConfiguration.DEFAULT_RM_ZK_ACL);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
try { try {
zkAcl = ZKUtil.parseACLs(zkAclConf); zkAcl = ZKUtil.parseACLs(zkAclConf);
} catch (ZKUtil.BadAclFormatException bafe) { } catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL); LOG.error("Invalid format for " + YarnConfiguration.RM_ZK_ACL);
throw bafe; throw bafe;
} }

View File

@ -396,6 +396,11 @@ public class MockRM extends ResourceManager {
protected void stopServer() { protected void stopServer() {
// don't do anything // don't do anything
} }
@Override
protected EmbeddedElectorService createEmbeddedElectorService() {
return null;
}
}; };
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.junit.Before; import org.junit.Before;
@ -39,6 +40,7 @@ import static org.junit.Assert.fail;
public class TestRMHA { public class TestRMHA {
private Log LOG = LogFactory.getLog(TestRMHA.class); private Log LOG = LogFactory.getLog(TestRMHA.class);
private final Configuration configuration = new YarnConfiguration();
private MockRM rm = null; private MockRM rm = null;
private static final String STATE_ERR = private static final String STATE_ERR =
"ResourceManager is in wrong HA state"; "ResourceManager is in wrong HA state";
@ -51,17 +53,13 @@ public class TestRMHA {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new YarnConfiguration(); configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
} }
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); configuration.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
rm = new MockRM(conf);
rm.init(conf);
} }
private void checkMonitorHealth() throws IOException { private void checkMonitorHealth() throws IOException {
@ -113,6 +111,9 @@ public class TestRMHA {
*/ */
@Test (timeout = 30000) @Test (timeout = 30000)
public void testStartAndTransitions() throws IOException { public void testStartAndTransitions() throws IOException {
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER); HAServiceProtocol.RequestSource.REQUEST_BY_USER);
@ -162,4 +163,63 @@ public class TestRMHA {
rm.areActiveServicesRunning()); rm.areActiveServicesRunning());
checkMonitorHealth(); checkMonitorHealth();
} }
@Test
public void testTransitionsWhenAutomaticFailoverEnabled() throws IOException {
final String ERR_UNFORCED_REQUEST = "User request succeeded even when " +
"automatic failover is enabled";
Configuration conf = new YarnConfiguration(configuration);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
// Transition to standby
try {
rm.adminService.transitionToStandby(requestInfo);
fail(ERR_UNFORCED_REQUEST);
} catch (AccessControlException e) {
// expected
}
checkMonitorHealth();
checkStandbyRMFunctionality();
// Transition to active
try {
rm.adminService.transitionToActive(requestInfo);
fail(ERR_UNFORCED_REQUEST);
} catch (AccessControlException e) {
// expected
}
checkMonitorHealth();
checkStandbyRMFunctionality();
final String ERR_FORCED_REQUEST = "Forced request by user should work " +
"even if automatic failover is enabled";
requestInfo = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
// Transition to standby
try {
rm.adminService.transitionToStandby(requestInfo);
} catch (AccessControlException e) {
fail(ERR_FORCED_REQUEST);
}
checkMonitorHealth();
checkStandbyRMFunctionality();
// Transition to active
try {
rm.adminService.transitionToActive(requestInfo);
} catch (AccessControlException e) {
fail(ERR_FORCED_REQUEST);
}
checkMonitorHealth();
checkActiveRMFunctionality();
}
} }

View File

@ -92,7 +92,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public RMStateStore getRMStateStore() throws Exception { public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/Test"; workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient(); this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode); this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
@ -140,7 +140,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.set(YarnConfiguration.RM_HA_IDS, rmIds); conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_HA_ID, rmId); conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
for (String id : HAUtil.getRMHAIds(conf)) { for (String id : HAUtil.getRMHAIds(conf)) {

View File

@ -107,7 +107,7 @@ public class TestZKRMStateStoreZKClientConnections extends
public RMStateStore getRMStateStore(Configuration conf) throws Exception { public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test"; String workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
watcher = new TestForwardingWatcher(); watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode); this.store = new TestZKRMStateStore(conf, workingZnode);
@ -120,8 +120,8 @@ public class TestZKRMStateStoreZKClientConnections extends
TestZKClient zkClientTester = new TestZKClient(); TestZKClient zkClientTester = new TestZKClient();
final String path = "/test"; final String path = "/test";
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 1000);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100); conf.setLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store = final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf); (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher(); TestDispatcher dispatcher = new TestDispatcher();
@ -153,7 +153,7 @@ public class TestZKRMStateStoreZKClientConnections extends
TestZKClient zkClientTester = new TestZKClient(); TestZKClient zkClientTester = new TestZKClient();
String path = "/test"; String path = "/test";
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100);
ZKRMStateStore store = ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf); (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher(); TestDispatcher dispatcher = new TestDispatcher();
@ -195,7 +195,7 @@ public class TestZKRMStateStoreZKClientConnections extends
TestZKClient zkClientTester = new TestZKClient(); TestZKClient zkClientTester = new TestZKClient();
String path = "/test"; String path = "/test";
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100);
ZKRMStateStore store = ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf); (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher(); TestDispatcher dispatcher = new TestDispatcher();
@ -227,7 +227,7 @@ public class TestZKRMStateStoreZKClientConnections extends
public void testSetZKAcl() { public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient(); TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca"); conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
try { try {
zkClientTester.store.zkClient.delete(zkClientTester.store zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1); .znodeWorkingPath, -1);
@ -240,7 +240,7 @@ public class TestZKRMStateStoreZKClientConnections extends
public void testInvalidZKAclConfiguration() { public void testInvalidZKAclConfiguration() {
TestZKClient zkClientTester = new TestZKClient(); TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*"); conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
try { try {
zkClientTester.getRMStateStore(conf); zkClientTester.getRMStateStore(conf);
fail("ZKRMStateStore created with bad ACL"); fail("ZKRMStateStore created with bad ACL");

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -98,6 +99,7 @@ public class MiniYARNCluster extends CompositeService {
private boolean useFixedPorts; private boolean useFixedPorts;
private boolean useRpc = false; private boolean useRpc = false;
private int failoverTimeout;
private ConcurrentMap<ApplicationAttemptId, Long> appMasters = private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2); new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
@ -189,12 +191,15 @@ public class MiniYARNCluster extends CompositeService {
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
if (useRpc && !useFixedPorts) { if (useRpc && !useFixedPorts) {
throw new YarnRuntimeException("Invalid configuration!" + throw new YarnRuntimeException("Invalid configuration!" +
" Minicluster can use rpc only when configured to use fixed ports"); " Minicluster can use rpc only when configured to use fixed ports");
} }
conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
if (resourceManagers.length > 1) { if (resourceManagers.length > 1) {
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
if (conf.get(YarnConfiguration.RM_HA_IDS) == null) { if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
@ -218,6 +223,13 @@ public class MiniYARNCluster extends CompositeService {
// Don't try to login using keytab in the testcases. // Don't try to login using keytab in the testcases.
} }
}; };
if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
setHARMConfiguration(i, conf);
} else {
setNonHARMConfiguration(conf);
}
}
addService(new ResourceManagerWrapper(i)); addService(new ResourceManagerWrapper(i));
} }
for(int index = 0; index < nodeManagers.length; index++) { for(int index = 0; index < nodeManagers.length; index++) {
@ -230,18 +242,103 @@ public class MiniYARNCluster extends CompositeService {
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
} }
private void setNonHARMConfiguration(Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
}
private void setHARMConfiguration(final int index, Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0");
}
}
private synchronized void initResourceManager(int index, Configuration conf) {
if (HAUtil.isHAEnabled(conf)) {
conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
}
resourceManagers[index].init(conf);
resourceManagers[index].getRMContext().getDispatcher().register(
RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
appMasters.put(event.getApplicationAttemptId(),
event.getTimestamp());
} else if (event instanceof RMAppAttemptUnregistrationEvent) {
appMasters.remove(event.getApplicationAttemptId());
}
}
});
}
private synchronized void startResourceManager(final int index) {
try {
Thread rmThread = new Thread() {
public void run() {
resourceManagers[index].start();
}
};
rmThread.setName("RM-" + index);
rmThread.start();
int waitCount = 0;
while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
"ResourceManager failed to start. Final state is "
+ resourceManagers[index].getServiceState());
}
} catch (Throwable t) {
throw new YarnRuntimeException(t);
}
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
}
@InterfaceAudience.Private
@VisibleForTesting
public synchronized void stopResourceManager(int index) {
if (resourceManagers[index] != null) {
resourceManagers[index].stop();
resourceManagers[index] = null;
}
}
@InterfaceAudience.Private
@VisibleForTesting
public synchronized void restartResourceManager(int index)
throws InterruptedException {
if (resourceManagers[index] != null) {
resourceManagers[index].stop();
resourceManagers[index] = null;
}
Configuration conf = getConfig();
resourceManagers[index] = new ResourceManager();
initResourceManager(index, getConfig());
startResourceManager(index);
}
public File getTestWorkDir() { public File getTestWorkDir() {
return testWorkDir; return testWorkDir;
} }
/** /**
* In a HA cluster, go through all the RMs and find the Active RM. If none * In a HA cluster, go through all the RMs and find the Active RM. In a
* of them are active, wait upto 5 seconds for them to transition to Active. * non-HA cluster, return the index of the only RM.
* *
* In an non-HA cluster, return the index of the only RM. * @return index of the active RM or -1 if none of them turn active
*
* @return index of the active RM or -1 if none of them transition to
* active even after 5 seconds of waiting
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
@ -250,9 +347,12 @@ public class MiniYARNCluster extends CompositeService {
return 0; return 0;
} }
int numRetriesForRMBecomingActive = 5; int numRetriesForRMBecomingActive = failoverTimeout / 100;
while (numRetriesForRMBecomingActive-- > 0) { while (numRetriesForRMBecomingActive-- > 0) {
for (int i = 0; i < resourceManagers.length; i++) { for (int i = 0; i < resourceManagers.length; i++) {
if (resourceManagers[i] == null) {
continue;
}
try { try {
if (HAServiceProtocol.HAServiceState.ACTIVE == if (HAServiceProtocol.HAServiceState.ACTIVE ==
resourceManagers[i].getRMContext().getRMAdminService() resourceManagers[i].getRMContext().getRMAdminService()
@ -265,7 +365,7 @@ public class MiniYARNCluster extends CompositeService {
} }
} }
try { try {
Thread.sleep(1000); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new YarnRuntimeException("Interrupted while waiting for one " + throw new YarnRuntimeException("Interrupted while waiting for one " +
"of the ResourceManagers to become active"); "of the ResourceManagers to become active");
@ -282,7 +382,7 @@ public class MiniYARNCluster extends CompositeService {
int activeRMIndex = getActiveRMIndex(); int activeRMIndex = getActiveRMIndex();
return activeRMIndex == -1 return activeRMIndex == -1
? null ? null
: this.resourceManagers[getActiveRMIndex()]; : this.resourceManagers[activeRMIndex];
} }
public ResourceManager getResourceManager(int i) { public ResourceManager getResourceManager(int i) {
@ -310,82 +410,21 @@ public class MiniYARNCluster extends CompositeService {
index = i; index = i;
} }
private void setNonHARMConfiguration(Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
}
private void setHARMConfiguration(Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
}
}
}
@Override @Override
protected synchronized void serviceInit(Configuration conf) protected synchronized void serviceInit(Configuration conf)
throws Exception { throws Exception {
conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); initResourceManager(index, conf);
if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
setHARMConfiguration(conf);
} else {
setNonHARMConfiguration(conf);
}
}
if (HAUtil.isHAEnabled(conf)) {
conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
}
resourceManagers[index].init(conf);
resourceManagers[index].getRMContext().getDispatcher().register
(RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
} else if (event instanceof RMAppAttemptUnregistrationEvent) {
appMasters.remove(event.getApplicationAttemptId());
}
}
});
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override @Override
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
try { startResourceManager(index);
new Thread() {
public void run() {
resourceManagers[index].start();
}
}.start();
int waitCount = 0;
while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
"ResourceManager failed to start. Final state is "
+ resourceManagers[index].getServiceState());
}
super.serviceStart();
} catch (Throwable t) {
throw new YarnRuntimeException(t);
}
LOG.info("MiniYARN ResourceManager address: " + LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS)); getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " + LOG.info("MiniYARN ResourceManager web address: " +
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
super.serviceStart();
} }
private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException { private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
@ -406,7 +445,6 @@ public class MiniYARNCluster extends CompositeService {
waitForAppMastersToFinish(5000); waitForAppMastersToFinish(5000);
resourceManagers[index].stop(); resourceManagers[index].stop();
} }
super.serviceStop();
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
// On Windows, clean up the short temporary symlink that was created to // On Windows, clean up the short temporary symlink that was created to
@ -420,6 +458,7 @@ public class MiniYARNCluster extends CompositeService {
testWorkDir.getAbsolutePath()); testWorkDir.getAbsolutePath());
} }
} }
super.serviceStop();
} }
} }