YARN-9049. Add application submit data to state store. (#5606)

This commit is contained in:
slfan1989 2023-05-04 00:19:54 +08:00 committed by GitHub
parent 0e46388474
commit c1d10f3872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 197 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.util.Records;
/**
@ -64,6 +65,18 @@ public abstract class ApplicationHomeSubCluster {
return appMapping;
}
@Private
@Unstable
public static ApplicationHomeSubCluster newInstance(ApplicationId appId, long createTime,
SubClusterId homeSubCluster, ApplicationSubmissionContext appSubmissionContext) {
ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class);
appMapping.setApplicationId(appId);
appMapping.setHomeSubCluster(homeSubCluster);
appMapping.setApplicationSubmissionContext(appSubmissionContext);
appMapping.setCreateTime(createTime);
return appMapping;
}
/**
* Get the {@link ApplicationId} representing the unique identifier of the
* application.
@ -123,6 +136,24 @@ public abstract class ApplicationHomeSubCluster {
public abstract void setCreateTime(long time);
/**
* Set Application Submission Context.
*
* @param context Application Submission Context.
*/
@Private
@Unstable
public abstract void setApplicationSubmissionContext(ApplicationSubmissionContext context);
/**
* Get Application Submission Context.
*
* @return Application Submission Context.
*/
@Private
@Unstable
public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
@Override
public boolean equals(Object obj) {
@ -139,6 +170,8 @@ public abstract class ApplicationHomeSubCluster {
return new EqualsBuilder()
.append(this.getApplicationId(), other.getApplicationId())
.append(this.getHomeSubCluster(), other.getHomeSubCluster())
.append(this.getApplicationSubmissionContext(),
other.getApplicationSubmissionContext())
.isEquals();
}
@ -150,7 +183,9 @@ public abstract class ApplicationHomeSubCluster {
return new HashCodeBuilder().
append(this.getApplicationId()).
append(this.getHomeSubCluster()).
append(this.getCreateTime()).toHashCode();
append(this.getCreateTime()).
append(this.getApplicationSubmissionContext())
.toHashCode();
}
@Override
@ -160,6 +195,7 @@ public abstract class ApplicationHomeSubCluster {
.append("ApplicationId: ").append(getApplicationId()).append(", ")
.append("HomeSubCluster: ").append(getHomeSubCluster()).append(", ")
.append("CreateTime: ").append(getCreateTime()).append(", ")
.append("ApplicationSubmissionContext: ").append(getApplicationSubmissionContext())
.append("]");
return sb.toString();
}

View File

@ -41,6 +41,17 @@ public abstract class GetApplicationHomeSubClusterRequest {
return appMapping;
}
@Private
@Unstable
public static GetApplicationHomeSubClusterRequest newInstance(
ApplicationId appId, boolean containsAppSubmissionContext) {
GetApplicationHomeSubClusterRequest appMapping =
Records.newRecord(GetApplicationHomeSubClusterRequest.class);
appMapping.setApplicationId(appId);
appMapping.setContainsAppSubmissionContext(containsAppSubmissionContext);
return appMapping;
}
/**
* Get the {@link ApplicationId} representing the unique identifier of the
* application.
@ -61,4 +72,27 @@ public abstract class GetApplicationHomeSubClusterRequest {
@Unstable
public abstract void setApplicationId(ApplicationId applicationId);
/**
* Get the flag that indicates whether appSubmissionContext should be
* returned.
* The reason for adding this variable is due to the consideration that
* appSubmissionContext is not commonly used and its data size can be large.
*
* @return whether to return appSubmissionContext.
*/
@Public
@Unstable
public abstract boolean getContainsAppSubmissionContext();
/**
* Set the flag that indicates whether appSubmissionContext should be
* returned.
*
* @param containsAppSubmissionContext whether to return appSubmissionContext.
*/
@Public
@Unstable
public abstract void setContainsAppSubmissionContext(
boolean containsAppSubmissionContext);
}

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationHomeSubClusterProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationHomeSubClusterProtoOrBuilder;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -45,6 +48,7 @@ public class ApplicationHomeSubClusterPBImpl extends ApplicationHomeSubCluster {
private ApplicationId applicationId = null;
private SubClusterId homeSubCluster = null;
private long createTime = 0L;
private ApplicationSubmissionContext applicationSubmissionContext;
public ApplicationHomeSubClusterPBImpl() {
builder = ApplicationHomeSubClusterProto.newBuilder();
@ -176,6 +180,29 @@ public class ApplicationHomeSubClusterPBImpl extends ApplicationHomeSubCluster {
builder.setCreateTime(time);
}
@Override
public void setApplicationSubmissionContext(ApplicationSubmissionContext context) {
maybeInitBuilder();
if (applicationSubmissionContext == null) {
builder.clearAppSubmitContext();
}
this.applicationSubmissionContext = context;
builder.setAppSubmitContext(convertToProtoFormat(context));
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationSubmissionContext != null) {
return this.applicationSubmissionContext;
}
if (!p.hasAppSubmitContext()) {
return null;
}
this.applicationSubmissionContext = convertFromProtoFormat(p.getAppSubmitContext());
return this.applicationSubmissionContext;
}
private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) {
return new SubClusterIdPBImpl(subClusterId);
}
@ -191,4 +218,14 @@ public class ApplicationHomeSubClusterPBImpl extends ApplicationHomeSubCluster {
private ApplicationIdProto convertToProtoFormat(ApplicationId appId) {
return ((ApplicationIdPBImpl) appId).getProto();
}
private ApplicationSubmissionContext convertFromProtoFormat(
ApplicationSubmissionContextProto appSubmitContext) {
return new ApplicationSubmissionContextPBImpl(appSubmitContext);
}
private ApplicationSubmissionContextProto convertToProtoFormat(
ApplicationSubmissionContext appContext) {
return ((ApplicationSubmissionContextPBImpl) appContext).getProto();
}
}

View File

@ -130,6 +130,18 @@ public class GetApplicationHomeSubClusterRequestPBImpl
builder.setApplicationId(convertToProtoFormat(applicationId));
}
@Override
public boolean getContainsAppSubmissionContext() {
GetApplicationHomeSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getContainsAppSubmissionContext();
}
@Override
public void setContainsAppSubmissionContext(boolean containsAppSubmissionContext) {
maybeInitBuilder();
builder.setContainsAppSubmissionContext(containsAppSubmissionContext);
}
private ApplicationId convertFromProtoFormat(ApplicationIdProto appId) {
return new ApplicationIdPBImpl(appId);
}

View File

@ -98,6 +98,7 @@ message ApplicationHomeSubClusterProto {
optional ApplicationIdProto application_id = 1;
optional SubClusterIdProto home_sub_cluster = 2;
optional int64 create_time = 3;
optional ApplicationSubmissionContextProto app_submit_context = 4;
}
message AddApplicationHomeSubClusterRequestProto {
@ -108,6 +109,14 @@ message AddApplicationHomeSubClusterResponseProto {
optional SubClusterIdProto home_sub_cluster = 1;
}
message GetApplicationClusterDataRequestProto {
optional ApplicationIdProto application_id = 1;
}
message GetApplicationClusterDataResponseProto {
optional ApplicationHomeSubClusterProto app_home_subcluster = 1;
}
message UpdateApplicationHomeSubClusterRequestProto {
optional ApplicationHomeSubClusterProto app_subcluster_map = 1;
}
@ -117,6 +126,7 @@ message UpdateApplicationHomeSubClusterResponseProto {
message GetApplicationHomeSubClusterRequestProto {
optional ApplicationIdProto application_id = 1;
optional bool contains_app_submission_context = 2 [default = false];
}
message GetApplicationHomeSubClusterResponseProto {

View File

@ -20,6 +20,14 @@ package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.BasePBImplRecordsTest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.AddApplicationHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.AddApplicationHomeSubClusterResponseProto;
@ -102,6 +110,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
/**
@ -115,6 +124,14 @@ public class TestFederationProtocolRecords extends BasePBImplRecordsTest {
generateByNewInstance(Version.class);
generateByNewInstance(SubClusterId.class);
generateByNewInstance(SubClusterInfo.class);
generateByNewInstance(Priority.class);
generateByNewInstance(URL.class);
generateByNewInstance(Resource.class);
generateByNewInstance(ContainerRetryContext.class);
generateByNewInstance(LocalResource.class);
generateByNewInstance(ContainerLaunchContext.class);
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationSubmissionContext.class);
generateByNewInstance(ApplicationHomeSubCluster.class);
generateByNewInstance(SubClusterPolicyConfiguration.class);
generateByNewInstance(RouterMasterKey.class);
@ -421,4 +438,54 @@ public class TestFederationProtocolRecords extends BasePBImplRecordsTest {
assertEquals(sc1, sc2);
}
@Test
public void testApplicationHomeSubClusterEqual() throws Exception {
// Case1, We create 2 ApplicationHomeSubCluster,
// all properties are consistent
// We expect the result to be equal.
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context1 =
ApplicationSubmissionContext.newInstance(appId1, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
long createTime = Time.now();
ApplicationHomeSubCluster ahsc1 =
ApplicationHomeSubCluster.newInstance(appId1, createTime, subClusterId1, context1);
ApplicationId appId2 = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context2 =
ApplicationSubmissionContext.newInstance(appId1, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId2, createTime, subClusterId2, context2);
assertEquals(ahsc1, ahsc2);
// Case2, We create 2 ApplicationHomeSubCluster, appId is different
// We expect the results to be unequal
ApplicationId appId3 = ApplicationId.newInstance(2, 1);
ApplicationSubmissionContext context3 =
ApplicationSubmissionContext.newInstance(appId3, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
ApplicationHomeSubCluster ahsc3 =
ApplicationHomeSubCluster.newInstance(appId3, createTime, subClusterId2, context3);
assertNotEquals(ahsc1, ahsc3);
// Case3, We create 2 ApplicationHomeSubCluster, createTime is different
// We expect the results to be unequal
long createTime2 = Time.now() + 1000;
ApplicationHomeSubCluster ahsc4 =
ApplicationHomeSubCluster.newInstance(appId2, createTime2, subClusterId1, context2);
assertNotEquals(ahsc1, ahsc4);
// Case4, We create 2 ApplicationHomeSubCluster, submissionContext is different
// We expect the results to be unequal
ApplicationHomeSubCluster ahsc5 =
ApplicationHomeSubCluster.newInstance(appId2, createTime2, subClusterId2, context3);
assertNotEquals(ahsc1, ahsc5);
}
}