YARN-4438. Implement RM leader election with curator. Contributed by Jian He

(cherry picked from commit 89022f8d4b)
This commit is contained in:
Xuan 2016-01-07 14:33:06 -08:00
parent ee9611719f
commit 2cbbf76c3d
10 changed files with 496 additions and 21 deletions

View File

@ -28,6 +28,8 @@ Release 2.9.0 - UNRELEASED
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
YARN-4438. Implement RM leader election with curator. (Jian He via xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -545,6 +545,11 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries";
@Private
public static final String CURATOR_LEADER_ELECTOR =
RM_HA_PREFIX + "curator-leader-elector.enabled";
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
////////////////////////////////
// RM state store configs
////////////////////////////////

View File

@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");

View File

@ -106,6 +106,7 @@ public class AdminService extends CompositeService implements
private String rmId;
private boolean autoFailoverEnabled;
private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector;
private Server server;
@ -132,13 +133,16 @@ public class AdminService extends CompositeService implements
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) {
if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
masterServiceBindAddress = conf.getSocketAddr(
@ -319,7 +323,7 @@ public class AdminService extends CompositeService implements
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RMHAProtocolService",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
@ -338,7 +342,7 @@ public class AdminService extends CompositeService implements
"Error on refreshAll during transistion to Active", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
"RMHAProtocolService");
"RM");
}
@Override
@ -356,10 +360,10 @@ public class AdminService extends CompositeService implements
try {
rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
"transitionToStandby", "RM");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
"", "RMHAProtocolService",
"", "RM",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
@ -369,15 +373,28 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
if (curatorEnabled) {
HAServiceStatus state;
if (rmContext.getLeaderElectorService().hasLeaderShip()) {
state = new HAServiceStatus(HAServiceState.ACTIVE);
} else {
state = new HAServiceStatus(HAServiceState.STANDBY);
}
// set empty string to avoid NPE at
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
state.setNotReadyToBecomeActive("");
return state;
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
return ret;
}
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@ -837,6 +854,12 @@ public class AdminService extends CompositeService implements
} else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled.";
}
return this.embeddedElector.getHAZookeeperConnectionState();
if (curatorEnabled) {
return "Connected to zookeeper : " + rmContext
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
.isConnected();
} else {
return this.embeddedElector.getHAZookeeperConnectionState();
}
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
public class LeaderElectorService extends AbstractService implements
LeaderLatchListener {
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
private RMContext rmContext;
private String latchPath;
private String rmId;
public LeaderElectorService(RMContext rmContext) {
super(LeaderElectorService.class.getName());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
Preconditions.checkNotNull(zkHostPort,
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
String zkBasePath = conf.get(
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
latchPath = zkBasePath + "/" + clusterId;
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
curator.start();
initAndStartLeaderLatch();
super.serviceInit(conf);
}
private void initAndStartLeaderLatch() throws Exception {
leaderLatch = new LeaderLatch(curator, latchPath, rmId);
leaderLatch.addListener(this);
leaderLatch.start();
}
@Override
protected void serviceStop() throws Exception {
closeLeaderLatch();
super.serviceStop();
}
public boolean hasLeaderShip() {
return leaderLatch.hasLeadership();
}
@Override
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
rmContext.getRMAdminService().transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
LOG.info(rmId + " failed to transition to active, giving up leadership",
e);
notLeader();
reJoinElection();
}
}
public void reJoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
}
}
private void closeLeaderLatch() throws IOException {
if (leaderLatch != null) {
leaderLatch.close();
}
}
@Override
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
rmContext.getRMAdminService().transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
LOG.info(rmId + " did not transition to standby successfully.");
}
}
// only for testing
@VisibleForTesting
public CuratorFramework getCuratorClient() {
return this.curator;
}
}

View File

@ -135,4 +135,8 @@ public interface RMContext {
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
void setLeaderElectorService(LeaderElectorService elector);
LeaderElectorService getLeaderElectorService();
}

View File

@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private LeaderElectorService elector;
/**
* Default constructor. To be used in conjunction with setter methods for
@ -133,6 +134,16 @@ public class RMContextImpl implements RMContext {
return this.rmDispatcher;
}
@Override
public void setLeaderElectorService(LeaderElectorService elector) {
this.elector = elector;
}
@Override
public LeaderElectorService getLeaderElectorService() {
return this.elector;
}
@Override
public RMStateStore getStateStore() {
return activeServiceContext.getStateStore();

View File

@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
private boolean curatorEnabled = false;
@VisibleForTesting
protected String webAppAddress;
@ -228,6 +229,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
LeaderElectorService elector = new LeaderElectorService(rmContext);
addService(elector);
rmContext.setLeaderElectorService(elector);
}
}
// Set UGI and do login
@ -759,7 +767,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
adminService.resetLeaderElection();
if (curatorEnabled) {
rmContext.getLeaderElectorService().reJoinElection();
} else {
adminService.resetLeaderElection();
}
return;
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.");
@ -996,7 +1008,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* instance of {@link RMActiveServices} and initializes it.
* @throws Exception
*/
protected void createAndInitActiveServices() throws Exception {
protected void createAndInitActiveServices() {
activeServices = new RMActiveServices(this);
activeServices.init(conf);
}
@ -1016,14 +1028,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
* Helper method to stop {@link #activeServices}.
* @throws Exception
*/
void stopActiveServices() throws Exception {
void stopActiveServices() {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
}
}
void reinitialize(boolean initialize) throws Exception {
void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
@ -1042,7 +1054,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@ -1083,7 +1094,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
transitionToStandby(true);
transitionToStandby(false);
} else {
transitionToActive();
}
@ -1338,4 +1349,5 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
}
}

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.base.Supplier;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestLeaderElectorService {
private static final String RM1_ADDRESS = "1.1.1.1:1";
private static final String RM1_NODE_ID = "rm1";
private static final String RM2_ADDRESS = "0.0.0.0:0";
private static final String RM2_NODE_ID = "rm2";
Configuration conf ;
MockRM rm1;
MockRM rm2;
TestingCluster zkCluster;
@Before
public void setUp() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.INFO);
conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
}
zkCluster = new TestingCluster(3);
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
zkCluster.start();
}
@After
public void tearDown() throws Exception {
if (rm1 != null) {
rm1.stop();
}
if (rm2 !=null) {
rm2.stop();
}
}
// 1. rm1 active
// 2. rm2 standby
// 3. stop rm1
// 4. rm2 become active
@Test (timeout = 20000)
public void testRMShutDownCauseFailover() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
// wait for some time to make sure rm2 will not become active;
Thread.sleep(5000);
waitFor(rm2, HAServiceState.STANDBY);
rm1.stop();
// rm2 should become active;
waitFor(rm2, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. rm2 standby
// 3. submit a job to rm1 which triggers state-store failure.
// 4. rm2 become
@Test
public void testStateStoreFailureCauseFailover() throws Exception {
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void storeApplicationStateInternal(ApplicationId
appId, ApplicationStateData appState) throws Exception{
throw new Exception("store app failure.");
}
};
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.init(conf);
rm1.start();
waitFor(rm1, HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
// submit an app which will trigger state-store failure.
rm1.submitApp(200, "app1", "user1", null, "default", false);
waitFor(rm1, HAServiceState.STANDBY);
// rm2 should become active;
waitFor(rm2, HAServiceState.ACTIVE);
rm2.stop();
// rm1 will become active again
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. restart zk cluster
// 3. rm1 will first relinquish leadership and re-acquire leadership
@Test
public void testZKClusterDown() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
// stop zk cluster
zkCluster.stop();
waitFor(rm1, HAServiceState.STANDBY);
Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
zkCluster = new TestingCluster(instanceSpecs);
zkCluster.start();
// rm becomes active again
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. kill the zk session between the rm and zk cluster.
// 3. rm1 will first relinquish leadership and re-acquire leadership
@Test
public void testExpireCurrentZKSession() throws Exception{
rm1 = startRM("rm1", HAServiceState.ACTIVE);
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
CuratorZookeeperClient client =
service.getCuratorClient().getZookeeperClient();
// this will expire current curator client session. curator will re-establish
// the session. RM will first relinquish leadership and re-acquire leadership
KillSession
.kill(client.getZooKeeper(), client.getCurrentConnectionString());
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 fail to become active.
// 2. rm1 will rejoin leader election and retry the leadership
@Test
public void testRMFailToTransitionToActive() throws Exception{
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
final AtomicBoolean throwException = new AtomicBoolean(true);
Thread launchRM = new Thread() {
@Override
public void run() {
rm1 = new MockRM(conf) {
@Override
synchronized void transitionToActive() throws Exception {
if (throwException.get()) {
throw new Exception("Fail to transition to active");
} else {
super.transitionToActive();
}
}
};
rm1.init(conf);
rm1.start();
}
};
launchRM.start();
// wait some time, rm will keep retry the leadership;
Thread.sleep(5000);
throwException.set(false);
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. rm2 standby
// 3. kill the current connected zk instance
// 4. either rm1 or rm2 will become active.
@Test
public void testKillZKInstance() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
ZooKeeper zkClient =
rm1.getRMContext().getLeaderElectorService().getCuratorClient()
.getZookeeperClient().getZooKeeper();
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
zkCluster.killServer(connectionInstance);
// wait for rm1 or rm2 to be active by randomness
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
try {
HAServiceState rm1State =
rm1.getAdminService().getServiceStatus().getState();
HAServiceState rm2State =
rm2.getAdminService().getServiceStatus().getState();
return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
.equals(HAServiceState.STANDBY)) || (
rm1State.equals(HAServiceState.STANDBY) && rm2State
.equals(HAServiceState.ACTIVE));
} catch (IOException e) {
}
return false;
}
}, 2000, 15000);
}
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
YarnConfiguration yarnConf = new YarnConfiguration(conf);
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
MockRM rm = new MockRM(yarnConf);
rm.init(yarnConf);
rm.start();
waitFor(rm, state);
return rm;
}
private void waitFor(final MockRM rm,
final HAServiceState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
try {
return rm.getAdminService().getServiceStatus().getState()
.equals(state);
} catch (IOException e) {
}
return false;
}
}, 2000, 15000);
}
}

View File

@ -471,8 +471,12 @@ public class TestRMHA {
memStore.init(conf);
rm = new MockRM(conf, memStore) {
@Override
void stopActiveServices() throws Exception {
Thread.sleep(10000);
void stopActiveServices() {
try {
Thread.sleep(10000);
} catch (Exception e) {
throw new RuntimeException (e);
}
super.stopActiveServices();
}
};