Merge r1523750 from trunk to branch-2 for YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1523753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
73c69decf1
commit
db17c6ac06
|
@ -14,6 +14,7 @@ Release 2.3.0 - UNRELEASED
|
|||
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
|
||||
YARN-1098. Separate out RM services into Always On and Active (Karthik
|
||||
Kambatla via bikas)
|
||||
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -270,6 +270,11 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
|
||||
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
|
||||
|
||||
/** HA related configs */
|
||||
public static final String RM_HA_PREFIX = RM_PREFIX + "ha.";
|
||||
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
|
||||
public static final boolean DEFAULT_RM_HA_ENABLED = false;
|
||||
|
||||
/** The class to use as the persistent store.*/
|
||||
public static final String RM_STORE = RM_PREFIX + "store.class";
|
||||
|
|
|
@ -268,6 +268,14 @@
|
|||
<!--value>hdfs://localhost:9000/rmstore</value-->
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Enable RM high-availability. When enabled, the RM starts
|
||||
in the Standby mode by default, and transitions to the Active mode when
|
||||
prompted to.</description>
|
||||
<name>yarn.resourcemanager.ha.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The maximum number of completed applications RM keeps. </description>
|
||||
<name>yarn.resourcemanager.max-completed-applications</name>
|
||||
|
|
|
@ -196,7 +196,7 @@ public class ClientRMService extends AbstractService implements
|
|||
|
||||
ApplicationId getNewApplicationId() {
|
||||
ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
|
||||
.newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
|
||||
.newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
|
||||
applicationCounter.incrementAndGet());
|
||||
LOG.info("Allocated new applicationId: " + applicationId.getId());
|
||||
return applicationId;
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class RMHAProtocolService extends AbstractService implements
|
||||
HAServiceProtocol {
|
||||
private static final Log LOG = LogFactory.getLog(RMHAProtocolService.class);
|
||||
|
||||
private Configuration conf;
|
||||
private ResourceManager rm;
|
||||
@VisibleForTesting
|
||||
protected HAServiceState haState = HAServiceState.INITIALIZING;
|
||||
|
||||
public RMHAProtocolService(ResourceManager resourceManager) {
|
||||
super("RMHAProtocolService");
|
||||
this.rm = resourceManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
this.conf = conf;
|
||||
rm.createAndInitActiveServices();
|
||||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceStart() throws Exception {
|
||||
boolean haEnabled = this.conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
||||
|
||||
if (haEnabled) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceStop() throws Exception {
|
||||
transitionToStandby(false);
|
||||
haState = HAServiceState.STOPPING;
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void monitorHealth() throws HealthCheckFailedException {
|
||||
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
|
||||
throw new HealthCheckFailedException(
|
||||
"Active ResourceManager services are not running!");
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void transitionToActive() throws Exception {
|
||||
if (haState == HAServiceState.ACTIVE) {
|
||||
LOG.info("Already in active state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to active");
|
||||
rm.startActiveServices();
|
||||
haState = HAServiceState.ACTIVE;
|
||||
LOG.info("Transitioned to active");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) {
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToActive();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error when transitioning to Active mode", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void transitionToStandby(boolean initialize)
|
||||
throws Exception {
|
||||
if (haState == HAServiceState.STANDBY) {
|
||||
LOG.info("Already in standby state");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Transitioning to standby");
|
||||
if (haState == HAServiceState.ACTIVE) {
|
||||
rm.stopActiveServices();
|
||||
if (initialize) {
|
||||
rm.createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
haState = HAServiceState.STANDBY;
|
||||
LOG.info("Transitioned to standby");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) {
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToStandby(true);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error when transitioning to Standby mode", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
} else {
|
||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -105,7 +105,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
||||
public static final long clusterTimeStamp = System.currentTimeMillis();
|
||||
private static long clusterTimeStamp = System.currentTimeMillis();
|
||||
|
||||
/**
|
||||
* "Always On" services. Services that need to run always irrespective of
|
||||
* the HA state of the RM.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected RMHAProtocolService haService;
|
||||
|
||||
/**
|
||||
* "Active" services. Services that need to run only on the Active RM.
|
||||
|
@ -155,14 +162,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
public RMContext getRMContext() {
|
||||
return this.rmContext;
|
||||
}
|
||||
|
||||
public static long getClusterTimeStamp() {
|
||||
return clusterTimeStamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
validateConfigs(conf);
|
||||
this.conf = conf;
|
||||
|
||||
activeServices = new RMActiveServices();
|
||||
addService(activeServices);
|
||||
haService = new RMHAProtocolService(this);
|
||||
addService(haService);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -470,6 +481,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
LOG.error("Error closing store.", e);
|
||||
}
|
||||
}
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
}
|
||||
|
@ -708,6 +720,43 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
webApp = builder.start(new RMWebApp(this));
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create and init {@link #activeServices}. This creates an
|
||||
* instance of {@link RMActiveServices} and initializes it.
|
||||
* @throws Exception
|
||||
*/
|
||||
void createAndInitActiveServices() throws Exception {
|
||||
activeServices = new RMActiveServices();
|
||||
activeServices.init(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to start {@link #activeServices}.
|
||||
* @throws Exception
|
||||
*/
|
||||
void startActiveServices() throws Exception {
|
||||
if (activeServices != null) {
|
||||
clusterTimeStamp = System.currentTimeMillis();
|
||||
activeServices.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to stop {@link #activeServices}.
|
||||
* @throws Exception
|
||||
*/
|
||||
void stopActiveServices() throws Exception {
|
||||
if (activeServices != null) {
|
||||
activeServices.stop();
|
||||
activeServices = null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected boolean areActiveServicesRunning() {
|
||||
return activeServices != null && activeServices.isInState(STATE.STARTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
try {
|
||||
|
@ -715,7 +764,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
} catch(IOException ie) {
|
||||
throw new YarnRuntimeException("Failed to login", ie);
|
||||
}
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
+ ", assigned nodeId " + nodeId;
|
||||
LOG.info(message);
|
||||
response.setNodeAction(NodeAction.NORMAL);
|
||||
response.setRMIdentifier(ResourceManager.clusterTimeStamp);
|
||||
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ public class RMContainerTokenSecretManager extends
|
|||
tokenIdentifier =
|
||||
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
||||
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
||||
.getMasterKey().getKeyId(), ResourceManager.clusterTimeStamp);
|
||||
.getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp());
|
||||
password = this.createPassword(tokenIdentifier);
|
||||
|
||||
} finally {
|
||||
|
|
|
@ -108,7 +108,7 @@ public class AppInfo {
|
|||
this.diagnostics = "";
|
||||
}
|
||||
this.finalStatus = app.getFinalApplicationStatus();
|
||||
this.clusterId = ResourceManager.clusterTimeStamp;
|
||||
this.clusterId = ResourceManager.getClusterTimeStamp();
|
||||
if (hasAccess) {
|
||||
this.startedTime = app.getStartTime();
|
||||
this.finishedTime = app.getFinishTime();
|
||||
|
|
|
@ -44,7 +44,7 @@ public class ClusterInfo {
|
|||
} // JAXB needs this
|
||||
|
||||
public ClusterInfo(ResourceManager rm) {
|
||||
long ts = ResourceManager.clusterTimeStamp;
|
||||
long ts = ResourceManager.getClusterTimeStamp();
|
||||
|
||||
this.id = ts;
|
||||
this.state = rm.getServiceState();
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestRMHA {
|
||||
private Log LOG = LogFactory.getLog(TestRMHA.class);
|
||||
private MockRM rm = null;
|
||||
private static final String STATE_ERR =
|
||||
"ResourceManager is in wrong HA state";
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
}
|
||||
|
||||
private void checkMonitorHealth() {
|
||||
try {
|
||||
rm.haService.monitorHealth();
|
||||
} catch (HealthCheckFailedException e) {
|
||||
fail("The RM is in bad health: it is Active, but the active services " +
|
||||
"are not running");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkStandbyRMFunctionality() throws IOException {
|
||||
assertEquals(STATE_ERR, HAServiceState.STANDBY,
|
||||
rm.haService.getServiceStatus().getState());
|
||||
assertFalse("Active RM services are started",
|
||||
rm.areActiveServicesRunning());
|
||||
assertTrue("RM is not ready to become active",
|
||||
rm.haService.getServiceStatus().isReadyToBecomeActive());
|
||||
}
|
||||
|
||||
private void checkActiveRMFunctionality() throws IOException {
|
||||
assertEquals(STATE_ERR, HAServiceState.ACTIVE,
|
||||
rm.haService.getServiceStatus().getState());
|
||||
assertTrue("Active RM services aren't started",
|
||||
rm.areActiveServicesRunning());
|
||||
assertTrue("RM is not ready to become active",
|
||||
rm.haService.getServiceStatus().isReadyToBecomeActive());
|
||||
|
||||
try {
|
||||
rm.getNewAppId();
|
||||
rm.registerNode("127.0.0.1:0", 2048);
|
||||
rm.submitApp(1024);
|
||||
} catch (Exception e) {
|
||||
fail("Unable to perform Active RM functions");
|
||||
LOG.error("ActiveRM check failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify the following RM HA transitions to the following states.
|
||||
* 1. Standby: Should be a no-op
|
||||
* 2. Active: Active services should start
|
||||
* 3. Active: Should be a no-op.
|
||||
* While active, submit a couple of jobs
|
||||
* 4. Standby: Active services should stop
|
||||
* 5. Active: Active services should start
|
||||
* 6. Stop the RM: All services should stop and RM should not be ready to
|
||||
* become Active
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
public void testStartAndTransitions() throws IOException {
|
||||
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
|
||||
rm.haService.getServiceStatus().getState());
|
||||
assertFalse("RM is ready to become active before being started",
|
||||
rm.haService.getServiceStatus().isReadyToBecomeActive());
|
||||
checkMonitorHealth();
|
||||
|
||||
rm.start();
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 1. Transition to Standby - must be a no-op
|
||||
rm.haService.transitionToStandby(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 2. Transition to active
|
||||
rm.haService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
|
||||
// 3. Transition to active - no-op
|
||||
rm.haService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
|
||||
// 4. Transition to standby
|
||||
rm.haService.transitionToStandby(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 5. Transition to active to check Active->Standby->Active works
|
||||
rm.haService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
|
||||
// 6. Stop the RM. All services should stop and RM should not be ready to
|
||||
// become active
|
||||
rm.stop();
|
||||
assertEquals(STATE_ERR, HAServiceState.STOPPING,
|
||||
rm.haService.getServiceStatus().getState());
|
||||
assertFalse("RM is ready to become active even after it is stopped",
|
||||
rm.haService.getServiceStatus().isReadyToBecomeActive());
|
||||
assertFalse("Active RM services are started",
|
||||
rm.areActiveServicesRunning());
|
||||
checkMonitorHealth();
|
||||
}
|
||||
}
|
|
@ -283,7 +283,7 @@ public class TestResourceTrackerService {
|
|||
RegisterNodeManagerResponse response = nm.registerNode();
|
||||
|
||||
// Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse
|
||||
Assert.assertEquals(ResourceManager.clusterTimeStamp,
|
||||
Assert.assertEquals(ResourceManager.getClusterTimeStamp(),
|
||||
response.getRMIdentifier());
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ public class TestApplicationMasterService {
|
|||
ContainerTokenIdentifier tokenId =
|
||||
BuilderUtils.newContainerTokenIdentifier(allocatedContainer
|
||||
.getContainerToken());
|
||||
Assert.assertEquals(MockRM.clusterTimeStamp, tokenId.getRMIdentifer());
|
||||
Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -295,10 +295,10 @@ public class TestRMWebServices extends JerseyTest {
|
|||
String hadoopVersion, String resourceManagerVersionBuiltOn,
|
||||
String resourceManagerBuildVersion, String resourceManagerVersion) {
|
||||
|
||||
assertEquals("clusterId doesn't match: ", ResourceManager.clusterTimeStamp,
|
||||
clusterid);
|
||||
assertEquals("startedOn doesn't match: ", ResourceManager.clusterTimeStamp,
|
||||
startedon);
|
||||
assertEquals("clusterId doesn't match: ",
|
||||
ResourceManager.getClusterTimeStamp(), clusterid);
|
||||
assertEquals("startedOn doesn't match: ",
|
||||
ResourceManager.getClusterTimeStamp(), startedon);
|
||||
assertTrue("stated doesn't match: " + state,
|
||||
state.matches(STATE.INITED.toString()));
|
||||
|
||||
|
|
|
@ -1181,8 +1181,8 @@ public class TestRMWebServicesApps extends JerseyTest {
|
|||
trackingUI);
|
||||
WebServicesTestUtils.checkStringMatch("diagnostics", app.getDiagnostics()
|
||||
.toString(), diagnostics);
|
||||
assertEquals("clusterId doesn't match", ResourceManager.clusterTimeStamp,
|
||||
clusterId);
|
||||
assertEquals("clusterId doesn't match",
|
||||
ResourceManager.getClusterTimeStamp(), clusterId);
|
||||
assertEquals("startedTime doesn't match", app.getStartTime(), startedTime);
|
||||
assertEquals("finishedTime doesn't match", app.getFinishTime(),
|
||||
finishedTime);
|
||||
|
|
Loading…
Reference in New Issue