YARN-5519. Add SubClusterId in AddApplicationHomeSubClusterResponse for Router Failover. (Ellen Hui via Subru)

This commit is contained in:
Subru Krishnan 2016-08-15 14:47:02 -07:00 committed by Carlo Curino
parent b747d59f41
commit e4f928cf9c
6 changed files with 120 additions and 63 deletions

View File

@ -51,15 +51,20 @@ public interface FederationApplicationHomeSubClusterStore {
/**
* Register the home {@code SubClusterId} of the newly submitted
* {@code ApplicationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure.
* successful, if not an exception reporting reason for a failure. If a
* mapping for the application already existed, the {@code SubClusterId} in
* this response will return the existing mapping which might be different
* from that in the {@code AddApplicationHomeSubClusterRequest}.
*
* @param request the request to register a new application with its home
* sub-cluster
* @return empty on successful registration of the application in the
* StateStore, if not an exception reporting reason for a failure
* @return upon successful registration of the application in the StateStore,
* {@code AddApplicationHomeSubClusterRequest} containing the home
* sub-cluster of the application. Otherwise, an exception reporting
* reason for a failure
* @throws YarnException if the request is invalid/fails
*/
AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException;
/**
@ -73,7 +78,7 @@ public interface FederationApplicationHomeSubClusterStore {
* not an exception reporting reason for a failure
* @throws YarnException if the request is invalid/fails
*/
UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap(
UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException;
/**
@ -85,7 +90,7 @@ public interface FederationApplicationHomeSubClusterStore {
* subcluster
* @throws YarnException if the request is invalid/fails
*/
GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException;
/**
@ -96,7 +101,7 @@ public interface FederationApplicationHomeSubClusterStore {
* @return the mapping of all submitted application to it's home sub-cluster
* @throws YarnException if the request is invalid/fails
*/
GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap(
GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException;
/**
@ -110,7 +115,7 @@ public interface FederationApplicationHomeSubClusterStore {
* not an exception reporting reason for a failure
* @throws YarnException if the request is invalid/fails
*/
DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap(
DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException;
}

View File

@ -89,7 +89,6 @@ public class MemoryFederationStateStore implements FederationStateStore {
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
subClusterInfo.setLastStartTime(clock.getTime());
membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
return SubClusterRegisterResponse.newInstance();
}
@ -156,21 +155,22 @@ public class MemoryFederationStateStore implements FederationStateStore {
// FederationApplicationHomeSubClusterStore methods
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " already exists");
}
if (!applications.containsKey(appId)) {
applications.put(appId,
request.getApplicationHomeSubCluster().getHomeSubCluster());
return AddApplicationHomeSubClusterResponse.newInstance();
}
return AddApplicationHomeSubClusterResponse
.newInstance(applications.get(appId));
}
@Override
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap(
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
@ -184,7 +184,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
@ -196,7 +196,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap(
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result =
new ArrayList<ApplicationHomeSubCluster>();
@ -210,7 +210,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap(
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {

View File

@ -24,10 +24,13 @@ import org.apache.hadoop.yarn.util.Records;
/**
* AddApplicationHomeSubClusterResponse contains the answer from the
* {@code FederationApplicationHomeSubClusterStore} to a request to insert a
* newly generated applicationId and its owner. Currently response is empty if
* the operation was successful, if not an exception reporting reason for a
* failure.
* newly generated applicationId and its owner.
*
* The response contains application's home sub-cluster as it is stored in the
* {@code FederationApplicationHomeSubClusterStore}. If a mapping for the
* application already existed, the {@code SubClusterId} in this response will
* return the existing mapping which might be different from that in the
* {@code AddApplicationHomeSubClusterRequest}.
*/
@Private
@Unstable
@ -35,10 +38,28 @@ public abstract class AddApplicationHomeSubClusterResponse {
@Private
@Unstable
public static AddApplicationHomeSubClusterResponse newInstance() {
public static AddApplicationHomeSubClusterResponse newInstance(
SubClusterId homeSubCluster) {
AddApplicationHomeSubClusterResponse response =
Records.newRecord(AddApplicationHomeSubClusterResponse.class);
response.setHomeSubCluster(homeSubCluster);
return response;
}
/**
* Set the home sub-cluster that this application has been assigned to.
*
* @param homeSubCluster the {@link SubClusterId} of this application's home
* sub-cluster
*/
public abstract void setHomeSubCluster(SubClusterId homeSubCluster);
/**
* Get the home sub-cluster that this application has been assigned to. This
* may not match the {@link SubClusterId} in the corresponding response, if
* the mapping for the request's application already existed.
*
* @return the {@link SubClusterId} of this application's home sub-cluster
*/
public abstract SubClusterId getHomeSubCluster();
}

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.AddApplicationHomeSubClusterResponseProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.AddApplicationHomeSubClusterResponseProtoOrBuilder;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import com.google.protobuf.TextFormat;
@ -48,12 +51,40 @@ public class AddApplicationHomeSubClusterResponsePBImpl
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AddApplicationHomeSubClusterResponseProto.newBuilder(proto);
}
viaProto = false;
}
public AddApplicationHomeSubClusterResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public void setHomeSubCluster(SubClusterId homeSubCluster) {
maybeInitBuilder();
if (homeSubCluster == null) {
builder.clearHomeSubCluster();
return;
}
builder.setHomeSubCluster(convertToProtoFormat(homeSubCluster));
}
@Override
public SubClusterId getHomeSubCluster() {
AddApplicationHomeSubClusterResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasHomeSubCluster()) {
return null;
}
return convertFromProtoFormat(p.getHomeSubCluster());
}
@Override
public int hashCode() {
return getProto().hashCode();
@ -75,4 +106,12 @@ public class AddApplicationHomeSubClusterResponsePBImpl
return TextFormat.shortDebugString(getProto());
}
private SubClusterId convertFromProtoFormat(SubClusterIdProto sc) {
return new SubClusterIdPBImpl(sc);
}
private SubClusterIdProto convertToProtoFormat(SubClusterId sc) {
return ((SubClusterIdPBImpl) sc).getProto();
}
}

View File

@ -102,6 +102,7 @@ message AddApplicationHomeSubClusterRequestProto {
}
message AddApplicationHomeSubClusterResponseProto {
optional SubClusterIdProto home_sub_cluster = 1;
}
message UpdateApplicationHomeSubClusterRequestProto {

View File

@ -94,7 +94,7 @@ public abstract class FederationStateStoreBaseTest {
@Test
public void testDeregisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(subClusterId);
registerSubCluster(createSubClusterInfo(subClusterId));
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
@ -124,7 +124,7 @@ public abstract class FederationStateStoreBaseTest {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
registerSubCluster(subClusterId);
registerSubCluster(subClusterInfo);
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
@ -184,10 +184,10 @@ public abstract class FederationStateStoreBaseTest {
@Test
public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(subClusterId);
registerSubCluster(createSubClusterInfo(subClusterId));
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.assertEquals(SubClusterState.SC_RUNNING,
@ -199,7 +199,7 @@ public abstract class FederationStateStoreBaseTest {
public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
@ -213,7 +213,7 @@ public abstract class FederationStateStoreBaseTest {
// Test FederationApplicationHomeSubClusterStore
@Test
public void testAddApplicationHomeSubClusterMap() throws Exception {
public void testAddApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationHomeSubCluster ahsc =
@ -222,15 +222,15 @@ public abstract class FederationStateStoreBaseTest {
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubClusterMap(request);
stateStore.addApplicationHomeSubCluster(request);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId, response.getHomeSubCluster());
Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
}
@Test
public void testAddApplicationHomeSubClusterMapAppAlreadyExists()
public void testAddApplicationHomeSubClusterAppAlreadyExists()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
@ -240,21 +240,17 @@ public abstract class FederationStateStoreBaseTest {
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
try {
stateStore.addApplicationHomeSubClusterMap(
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " already exists"));
}
Assert.assertEquals(subClusterId1, response.getHomeSubCluster());
Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
}
@Test
public void testDeleteApplicationHomeSubClusterMap() throws Exception {
public void testDeleteApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
@ -263,7 +259,7 @@ public abstract class FederationStateStoreBaseTest {
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
DeleteApplicationHomeSubClusterResponse response =
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.assertNotNull(response);
try {
@ -277,14 +273,13 @@ public abstract class FederationStateStoreBaseTest {
}
@Test
public void testDeleteApplicationHomeSubClusterMapUnknownApp()
throws Exception {
public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
DeleteApplicationHomeSubClusterRequest delRequest =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.deleteApplicationHomeSubClusterMap(delRequest);
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
@ -293,7 +288,7 @@ public abstract class FederationStateStoreBaseTest {
}
@Test
public void testGetApplicationHomeSubClusterMap() throws Exception {
public void testGetApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
@ -302,7 +297,7 @@ public abstract class FederationStateStoreBaseTest {
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse result =
stateStore.getApplicationHomeSubClusterMap(getRequest);
stateStore.getApplicationHomeSubCluster(getRequest);
Assert.assertEquals(appId,
result.getApplicationHomeSubCluster().getApplicationId());
@ -311,13 +306,13 @@ public abstract class FederationStateStoreBaseTest {
}
@Test
public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception {
public void testGetApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.getApplicationHomeSubClusterMap(request);
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
@ -326,7 +321,7 @@ public abstract class FederationStateStoreBaseTest {
}
@Test
public void testGetApplicationsHomeSubClusterMap() throws Exception {
public void testGetApplicationsHomeSubCluster() throws Exception {
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc1 =
@ -344,7 +339,7 @@ public abstract class FederationStateStoreBaseTest {
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubClusterMap(getRequest);
stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
@ -352,7 +347,7 @@ public abstract class FederationStateStoreBaseTest {
}
@Test
public void testUpdateApplicationHomeSubClusterMap() throws Exception {
public void testUpdateApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
@ -365,16 +360,14 @@ public abstract class FederationStateStoreBaseTest {
UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
UpdateApplicationHomeSubClusterResponse response =
stateStore.updateApplicationHomeSubClusterMap(updateRequest);
stateStore.updateApplicationHomeSubCluster(updateRequest);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
}
@Test
public void testUpdateApplicationHomeSubClusterMapUnknownApp()
throws Exception {
public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc =
@ -384,7 +377,7 @@ public abstract class FederationStateStoreBaseTest {
UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
try {
stateStore.updateApplicationHomeSubClusterMap((updateRequest));
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
@ -484,7 +477,7 @@ public abstract class FederationStateStoreBaseTest {
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
CLOCK.getTime(), "cabability");
CLOCK.getTime(), "capability");
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
@ -499,7 +492,7 @@ public abstract class FederationStateStoreBaseTest {
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
stateStore.addApplicationHomeSubClusterMap(request);
stateStore.addApplicationHomeSubCluster(request);
}
private void setPolicyConf(String queue, String policyType)
@ -510,10 +503,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.setPolicyConfiguration(request);
}
private void registerSubCluster(SubClusterId subClusterId)
private void registerSubCluster(SubClusterInfo subClusterInfo)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
@ -531,7 +522,7 @@ public abstract class FederationStateStoreBaseTest {
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubClusterMap(request);
stateStore.getApplicationHomeSubCluster(request);
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}