YARN-4438. Implement RM leader election with curator. Contributed by Jian He

(cherry picked from commit 89022f8d4b)
(cherry picked from commit 2cbbf76c3d)

Conflicts:

	hadoop-yarn-project/CHANGES.txt
This commit is contained in:
Xuan 2016-01-07 14:33:06 -08:00 committed by Junping Du
parent 6a18ae849f
commit 120f3a0ff9
9 changed files with 494 additions and 21 deletions

View File

@ -550,6 +550,11 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries"; + "failover-controller.active-standby-elector.zk.retries";
@Private
public static final String CURATOR_LEADER_ELECTOR =
RM_HA_PREFIX + "curator-leader-elector.enabled";
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
//////////////////////////////// ////////////////////////////////
// RM state store configs // RM state store configs
//////////////////////////////// ////////////////////////////////

View File

@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Ignore blacklisting nodes for AM failures feature since it is still a // Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress" // "work in progress"

View File

@ -109,6 +109,7 @@ public class AdminService extends CompositeService implements
private String rmId; private String rmId;
private boolean autoFailoverEnabled; private boolean autoFailoverEnabled;
private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector; private EmbeddedElectorService embeddedElector;
private Server server; private Server server;
@ -135,13 +136,16 @@ public class AdminService extends CompositeService implements
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) { if (rmContext.isHAEnabled()) {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) { if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) { if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService(); embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector); addIfService(embeddedElector);
} }
} }
} }
masterServiceBindAddress = conf.getSocketAddr( masterServiceBindAddress = conf.getSocketAddr(
@ -322,7 +326,7 @@ public class AdminService extends CompositeService implements
rm.transitionToActive(); rm.transitionToActive();
} catch (Exception e) { } catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RMHAProtocolService", "", "RM",
"Exception transitioning to active"); "Exception transitioning to active");
throw new ServiceFailedException( throw new ServiceFailedException(
"Error when transitioning to Active mode", e); "Error when transitioning to Active mode", e);
@ -341,7 +345,7 @@ public class AdminService extends CompositeService implements
"Error on refreshAll during transistion to Active", e); "Error on refreshAll during transistion to Active", e);
} }
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
"RMHAProtocolService"); "RM");
} }
@Override @Override
@ -359,10 +363,10 @@ public class AdminService extends CompositeService implements
try { try {
rm.transitionToStandby(true); rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(), RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService"); "transitionToStandby", "RM");
} catch (Exception e) { } catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
"", "RMHAProtocolService", "", "RM",
"Exception transitioning to standby"); "Exception transitioning to standby");
throw new ServiceFailedException( throw new ServiceFailedException(
"Error when transitioning to Standby mode", e); "Error when transitioning to Standby mode", e);
@ -372,14 +376,27 @@ public class AdminService extends CompositeService implements
@Override @Override
public synchronized HAServiceStatus getServiceStatus() throws IOException { public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState"); checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState(); if (curatorEnabled) {
HAServiceStatus ret = new HAServiceStatus(haState); HAServiceStatus state;
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { if (rmContext.getLeaderElectorService().hasLeaderShip()) {
ret.setReadyToBecomeActive(); state = new HAServiceStatus(HAServiceState.ACTIVE);
} else {
state = new HAServiceStatus(HAServiceState.STANDBY);
}
// set empty string to avoid NPE at
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
state.setNotReadyToBecomeActive("");
return state;
} else { } else {
ret.setNotReadyToBecomeActive("State is " + haState); HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
} }
return ret;
} }
@Override @Override
@ -885,6 +902,12 @@ public class AdminService extends CompositeService implements
} else if (!autoFailoverEnabled) { } else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled."; return "Auto Failover is not enabled.";
} }
return this.embeddedElector.getHAZookeeperConnectionState(); if (curatorEnabled) {
return "Connected to zookeeper : " + rmContext
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
.isConnected();
} else {
return this.embeddedElector.getHAZookeeperConnectionState();
}
} }
} }

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
public class LeaderElectorService extends AbstractService implements
LeaderLatchListener {
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
private RMContext rmContext;
private String latchPath;
private String rmId;
public LeaderElectorService(RMContext rmContext) {
super(LeaderElectorService.class.getName());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
Preconditions.checkNotNull(zkHostPort,
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
String zkBasePath = conf.get(
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
latchPath = zkBasePath + "/" + clusterId;
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
curator.start();
initAndStartLeaderLatch();
super.serviceInit(conf);
}
private void initAndStartLeaderLatch() throws Exception {
leaderLatch = new LeaderLatch(curator, latchPath, rmId);
leaderLatch.addListener(this);
leaderLatch.start();
}
@Override
protected void serviceStop() throws Exception {
closeLeaderLatch();
super.serviceStop();
}
public boolean hasLeaderShip() {
return leaderLatch.hasLeadership();
}
@Override
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
rmContext.getRMAdminService().transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
LOG.info(rmId + " failed to transition to active, giving up leadership",
e);
notLeader();
reJoinElection();
}
}
public void reJoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
}
}
private void closeLeaderLatch() throws IOException {
if (leaderLatch != null) {
leaderLatch.close();
}
}
@Override
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
rmContext.getRMAdminService().transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
LOG.info(rmId + " did not transition to standby successfully.");
}
}
// only for testing
@VisibleForTesting
public CuratorFramework getCuratorClient() {
return this.curator;
}
}

View File

@ -135,4 +135,8 @@ public interface RMContext {
PlacementManager getQueuePlacementManager(); PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr); void setQueuePlacementManager(PlacementManager placementMgr);
void setLeaderElectorService(LeaderElectorService elector);
LeaderElectorService getLeaderElectorService();
} }

View File

@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter; private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher; private SystemMetricsPublisher systemMetricsPublisher;
private LeaderElectorService elector;
private final Object haServiceStateLock = new Object(); private final Object haServiceStateLock = new Object();
@ -135,6 +136,16 @@ public class RMContextImpl implements RMContext {
return this.rmDispatcher; return this.rmDispatcher;
} }
@Override
public void setLeaderElectorService(LeaderElectorService elector) {
this.elector = elector;
}
@Override
public LeaderElectorService getLeaderElectorService() {
return this.elector;
}
@Override @Override
public RMStateStore getStateStore() { public RMStateStore getStateStore() {
return activeServiceContext.getStateStore(); return activeServiceContext.getStateStore();

View File

@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private AppReportFetcher fetcher = null; private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker; protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor; private JvmPauseMonitor pauseMonitor;
private boolean curatorEnabled = false;
@VisibleForTesting @VisibleForTesting
protected String webAppAddress; protected String webAppAddress;
@ -233,6 +234,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) { if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf); HAUtil.verifyAndSetConfiguration(this.conf);
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
LeaderElectorService elector = new LeaderElectorService(rmContext);
addService(elector);
rmContext.setLeaderElectorService(elector);
}
} }
// Set UGI and do login // Set UGI and do login
@ -772,7 +780,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Transition to standby and reinit active services // Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode"); LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true); transitionToStandby(true);
adminService.resetLeaderElection(); if (curatorEnabled) {
rmContext.getLeaderElectorService().reJoinElection();
} else {
adminService.resetLeaderElection();
}
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode."); LOG.fatal("Failed to transition RM to Standby mode.");
@ -982,7 +994,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* instance of {@link RMActiveServices} and initializes it. * instance of {@link RMActiveServices} and initializes it.
* @throws Exception * @throws Exception
*/ */
protected void createAndInitActiveServices() throws Exception { protected void createAndInitActiveServices() {
activeServices = new RMActiveServices(this); activeServices = new RMActiveServices(this);
activeServices.init(conf); activeServices.init(conf);
} }
@ -1002,14 +1014,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
* Helper method to stop {@link #activeServices}. * Helper method to stop {@link #activeServices}.
* @throws Exception * @throws Exception
*/ */
void stopActiveServices() throws Exception { void stopActiveServices() {
if (activeServices != null) { if (activeServices != null) {
activeServices.stop(); activeServices.stop();
activeServices = null; activeServices = null;
} }
} }
void reinitialize(boolean initialize) throws Exception { void reinitialize(boolean initialize) {
ClusterMetrics.destroy(); ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics(); QueueMetrics.clearQueueMetrics();
if (initialize) { if (initialize) {
@ -1028,7 +1040,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.info("Already in active state"); LOG.info("Already in active state");
return; return;
} }
LOG.info("Transitioning to active state"); LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() { this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@ -1069,7 +1080,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) { if (this.rmContext.isHAEnabled()) {
transitionToStandby(true); transitionToStandby(false);
} else { } else {
transitionToActive(); transitionToActive();
} }
@ -1324,4 +1335,5 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" " out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n"); + "[-remove-application-from-state-store <appId>]" + "\n");
} }
} }

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.base.Supplier;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestLeaderElectorService {
private static final String RM1_ADDRESS = "1.1.1.1:1";
private static final String RM1_NODE_ID = "rm1";
private static final String RM2_ADDRESS = "0.0.0.0:0";
private static final String RM2_NODE_ID = "rm2";
Configuration conf ;
MockRM rm1;
MockRM rm2;
TestingCluster zkCluster;
@Before
public void setUp() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.INFO);
conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
}
zkCluster = new TestingCluster(3);
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
zkCluster.start();
}
@After
public void tearDown() throws Exception {
if (rm1 != null) {
rm1.stop();
}
if (rm2 !=null) {
rm2.stop();
}
}
// 1. rm1 active
// 2. rm2 standby
// 3. stop rm1
// 4. rm2 become active
@Test (timeout = 20000)
public void testRMShutDownCauseFailover() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
// wait for some time to make sure rm2 will not become active;
Thread.sleep(5000);
waitFor(rm2, HAServiceState.STANDBY);
rm1.stop();
// rm2 should become active;
waitFor(rm2, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. rm2 standby
// 3. submit a job to rm1 which triggers state-store failure.
// 4. rm2 become
@Test
public void testStateStoreFailureCauseFailover() throws Exception {
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void storeApplicationStateInternal(ApplicationId
appId, ApplicationStateData appState) throws Exception{
throw new Exception("store app failure.");
}
};
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.init(conf);
rm1.start();
waitFor(rm1, HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
// submit an app which will trigger state-store failure.
rm1.submitApp(200, "app1", "user1", null, "default", false);
waitFor(rm1, HAServiceState.STANDBY);
// rm2 should become active;
waitFor(rm2, HAServiceState.ACTIVE);
rm2.stop();
// rm1 will become active again
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. restart zk cluster
// 3. rm1 will first relinquish leadership and re-acquire leadership
@Test
public void testZKClusterDown() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
// stop zk cluster
zkCluster.stop();
waitFor(rm1, HAServiceState.STANDBY);
Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
zkCluster = new TestingCluster(instanceSpecs);
zkCluster.start();
// rm becomes active again
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. kill the zk session between the rm and zk cluster.
// 3. rm1 will first relinquish leadership and re-acquire leadership
@Test
public void testExpireCurrentZKSession() throws Exception{
rm1 = startRM("rm1", HAServiceState.ACTIVE);
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
CuratorZookeeperClient client =
service.getCuratorClient().getZookeeperClient();
// this will expire current curator client session. curator will re-establish
// the session. RM will first relinquish leadership and re-acquire leadership
KillSession
.kill(client.getZooKeeper(), client.getCurrentConnectionString());
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 fail to become active.
// 2. rm1 will rejoin leader election and retry the leadership
@Test
public void testRMFailToTransitionToActive() throws Exception{
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
final AtomicBoolean throwException = new AtomicBoolean(true);
Thread launchRM = new Thread() {
@Override
public void run() {
rm1 = new MockRM(conf) {
@Override
synchronized void transitionToActive() throws Exception {
if (throwException.get()) {
throw new Exception("Fail to transition to active");
} else {
super.transitionToActive();
}
}
};
rm1.init(conf);
rm1.start();
}
};
launchRM.start();
// wait some time, rm will keep retry the leadership;
Thread.sleep(5000);
throwException.set(false);
waitFor(rm1, HAServiceState.ACTIVE);
}
// 1. rm1 active
// 2. rm2 standby
// 3. kill the current connected zk instance
// 4. either rm1 or rm2 will become active.
@Test
public void testKillZKInstance() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
ZooKeeper zkClient =
rm1.getRMContext().getLeaderElectorService().getCuratorClient()
.getZookeeperClient().getZooKeeper();
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
zkCluster.killServer(connectionInstance);
// wait for rm1 or rm2 to be active by randomness
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
try {
HAServiceState rm1State =
rm1.getAdminService().getServiceStatus().getState();
HAServiceState rm2State =
rm2.getAdminService().getServiceStatus().getState();
return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
.equals(HAServiceState.STANDBY)) || (
rm1State.equals(HAServiceState.STANDBY) && rm2State
.equals(HAServiceState.ACTIVE));
} catch (IOException e) {
}
return false;
}
}, 2000, 15000);
}
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
YarnConfiguration yarnConf = new YarnConfiguration(conf);
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
MockRM rm = new MockRM(yarnConf);
rm.init(yarnConf);
rm.start();
waitFor(rm, state);
return rm;
}
private void waitFor(final MockRM rm,
final HAServiceState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
try {
return rm.getAdminService().getServiceStatus().getState()
.equals(state);
} catch (IOException e) {
}
return false;
}
}, 2000, 15000);
}
}

View File

@ -471,8 +471,12 @@ public class TestRMHA {
memStore.init(conf); memStore.init(conf);
rm = new MockRM(conf, memStore) { rm = new MockRM(conf, memStore) {
@Override @Override
void stopActiveServices() throws Exception { void stopActiveServices() {
Thread.sleep(10000); try {
Thread.sleep(10000);
} catch (Exception e) {
throw new RuntimeException (e);
}
super.stopActiveServices(); super.stopActiveServices();
} }
}; };