YARN-4170. AM need to be notified with priority in AllocateResponse. Contributed by Sunil G
(cherry picked from commit f9da5cdb2b
)
This commit is contained in:
parent
204ee6a40f
commit
1ba73169c7
|
@ -458,6 +458,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity
|
YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity
|
||||||
by partition to REST API. (Naganarasimha G R via wangda)
|
by partition to REST API. (Naganarasimha G R via wangda)
|
||||||
|
|
||||||
|
YARN-4170. AM need to be notified with priority in AllocateResponse.
|
||||||
|
(Sunil G via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
@ -314,4 +315,17 @@ public abstract class AllocateResponse {
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setAMRMToken(Token amRMToken);
|
public abstract void setAMRMToken(Token amRMToken);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the application
|
||||||
|
*
|
||||||
|
* @return get application priority
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract Priority getApplicationPriority();
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setApplicationPriority(Priority priority);
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,6 +88,7 @@ message AllocateResponseProto {
|
||||||
repeated ContainerProto increased_containers = 10;
|
repeated ContainerProto increased_containers = 10;
|
||||||
repeated ContainerProto decreased_containers = 11;
|
repeated ContainerProto decreased_containers = 11;
|
||||||
optional hadoop.common.TokenProto am_rm_token = 12;
|
optional hadoop.common.TokenProto am_rm_token = 12;
|
||||||
|
optional PriorityProto application_priority = 13;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum SchedulerResourceTypes {
|
enum SchedulerResourceTypes {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||||
|
@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
|
||||||
|
@ -72,7 +75,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
private List<NodeReport> updatedNodes = null;
|
private List<NodeReport> updatedNodes = null;
|
||||||
private PreemptionMessage preempt;
|
private PreemptionMessage preempt;
|
||||||
private Token amrmToken = null;
|
private Token amrmToken = null;
|
||||||
|
private Priority appPriority = null;
|
||||||
|
|
||||||
public AllocateResponsePBImpl() {
|
public AllocateResponsePBImpl() {
|
||||||
builder = AllocateResponseProto.newBuilder();
|
builder = AllocateResponseProto.newBuilder();
|
||||||
}
|
}
|
||||||
|
@ -154,6 +158,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
if (this.amrmToken != null) {
|
if (this.amrmToken != null) {
|
||||||
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
|
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
|
||||||
}
|
}
|
||||||
|
if (this.appPriority != null) {
|
||||||
|
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void mergeLocalToProto() {
|
private synchronized void mergeLocalToProto() {
|
||||||
|
@ -378,6 +385,27 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
this.amrmToken = amRMToken;
|
this.amrmToken = amRMToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Priority getApplicationPriority() {
|
||||||
|
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.appPriority != null) {
|
||||||
|
return this.appPriority;
|
||||||
|
}
|
||||||
|
if (!p.hasApplicationPriority()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.appPriority = convertFromProtoFormat(p.getApplicationPriority());
|
||||||
|
return this.appPriority;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationPriority(Priority priority) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (priority == null)
|
||||||
|
builder.clearApplicationPriority();
|
||||||
|
this.appPriority = priority;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void initLocalIncreasedContainerList() {
|
private synchronized void initLocalIncreasedContainerList() {
|
||||||
if (this.increasedContainers != null) {
|
if (this.increasedContainers != null) {
|
||||||
return;
|
return;
|
||||||
|
@ -644,4 +672,12 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
private TokenProto convertToProtoFormat(Token t) {
|
private TokenProto convertToProtoFormat(Token t) {
|
||||||
return ((TokenPBImpl)t).getProto();
|
return ((TokenPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||||
|
return new PriorityPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private PriorityProto convertToProtoFormat(Priority t) {
|
||||||
|
return ((PriorityPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -563,6 +563,10 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
allocateResponse
|
allocateResponse
|
||||||
.setPreemptionMessage(generatePreemptionMessage(allocation));
|
.setPreemptionMessage(generatePreemptionMessage(allocation));
|
||||||
|
|
||||||
|
// Set application priority
|
||||||
|
allocateResponse.setApplicationPriority(app
|
||||||
|
.getApplicationSubmissionContext().getPriority());
|
||||||
|
|
||||||
// update AMRMToken if the token is rolled-up
|
// update AMRMToken if the token is rolled-up
|
||||||
MasterKeyData nextMasterKey =
|
MasterKeyData nextMasterKey =
|
||||||
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
|
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -437,6 +438,48 @@ public class TestApplicationMasterService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testPriorityInAllocatedResponse() throws Exception {
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
// Set Max Application Priority as 10
|
||||||
|
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Priority appPriority1 = Priority.newInstance(5);
|
||||||
|
RMApp app1 = rm.submitApp(2048, appPriority1);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
|
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||||
|
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
allocateRequest.setReleaseList(release);
|
||||||
|
allocateRequest.setAskList(ask);
|
||||||
|
|
||||||
|
AllocateResponse response1 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(appPriority1, response1.getApplicationPriority());
|
||||||
|
|
||||||
|
// get scheduler
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// Change the priority of App1 to 8
|
||||||
|
Priority appPriority2 = Priority.newInstance(8);
|
||||||
|
cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
|
||||||
|
|
||||||
|
AllocateResponse response2 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private static class MyResourceManager extends MockRM {
|
private static class MyResourceManager extends MockRM {
|
||||||
|
|
||||||
public MyResourceManager(YarnConfiguration conf) {
|
public MyResourceManager(YarnConfiguration conf) {
|
||||||
|
|
|
@ -976,16 +976,6 @@ public class TestCapacityScheduler {
|
||||||
CapacityScheduler.schedule(cs);
|
CapacityScheduler.schedule(cs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
|
||||||
throws Exception {
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
||||||
nm.nodeHeartbeat(true);
|
|
||||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
||||||
am.registerAppAttempt();
|
|
||||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
|
||||||
return am;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
|
private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
|
||||||
int numAMPreempted, int numTaskPreempted,
|
int numAMPreempted, int numTaskPreempted,
|
||||||
|
@ -1156,7 +1146,8 @@ public class TestCapacityScheduler {
|
||||||
|
|
||||||
// create app and launch the AM
|
// create app and launch the AM
|
||||||
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
|
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
|
||||||
MockAM am0 = launchAM(app0, rm1, nm1);
|
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
|
||||||
// get scheduler app
|
// get scheduler app
|
||||||
FiCaSchedulerApp schedulerAppAttempt =
|
FiCaSchedulerApp schedulerAppAttempt =
|
||||||
|
@ -1190,7 +1181,9 @@ public class TestCapacityScheduler {
|
||||||
Resource.newInstance(0, 0), false, 0);
|
Resource.newInstance(0, 0), false, 0);
|
||||||
|
|
||||||
// launch app0-attempt1
|
// launch app0-attempt1
|
||||||
MockAM am1 = launchAM(app0, rm1, nm1);
|
MockAM am1 = MockRM.launchAM(app0, rm1, nm1);
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
schedulerAppAttempt =
|
schedulerAppAttempt =
|
||||||
cs.getSchedulerApplications().get(app0.getApplicationId())
|
cs.getSchedulerApplications().get(app0.getApplicationId())
|
||||||
.getCurrentAppAttempt();
|
.getCurrentAppAttempt();
|
||||||
|
|
Loading…
Reference in New Issue