YARN-1623. Include queue name in RegisterApplicationMasterResponse (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1560545 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8ceddeb0c3
commit
163348701c
@ -234,6 +234,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
YARN-1624. QueuePlacementPolicy format is not easily readable via a JAXB
|
YARN-1624. QueuePlacementPolicy format is not easily readable via a JAXB
|
||||||
parser (Aditya Acharya via Sandy Ryza)
|
parser (Aditya Acharya via Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1623. Include queue name in RegisterApplicationMasterResponse (Sandy
|
||||||
|
Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -55,13 +55,14 @@ public abstract class RegisterApplicationMasterResponse {
|
|||||||
public static RegisterApplicationMasterResponse newInstance(
|
public static RegisterApplicationMasterResponse newInstance(
|
||||||
Resource minCapability, Resource maxCapability,
|
Resource minCapability, Resource maxCapability,
|
||||||
Map<ApplicationAccessType, String> acls, ByteBuffer key,
|
Map<ApplicationAccessType, String> acls, ByteBuffer key,
|
||||||
List<Container> containersFromPreviousAttempt) {
|
List<Container> containersFromPreviousAttempt, String queue) {
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response =
|
||||||
Records.newRecord(RegisterApplicationMasterResponse.class);
|
Records.newRecord(RegisterApplicationMasterResponse.class);
|
||||||
response.setMaximumResourceCapability(maxCapability);
|
response.setMaximumResourceCapability(maxCapability);
|
||||||
response.setApplicationACLs(acls);
|
response.setApplicationACLs(acls);
|
||||||
response.setClientToAMTokenMasterKey(key);
|
response.setClientToAMTokenMasterKey(key);
|
||||||
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
|
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
|
||||||
|
response.setQueue(queue);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,6 +112,20 @@ public static RegisterApplicationMasterResponse newInstance(
|
|||||||
@Stable
|
@Stable
|
||||||
public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
|
public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Get the queue that the application was placed in.<p>
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract String getQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Set the queue that the application was placed in.<p>
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract void setQueue(String queue);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Get the list of running containers as viewed by
|
* Get the list of running containers as viewed by
|
||||||
|
@ -45,6 +45,7 @@ message RegisterApplicationMasterResponseProto {
|
|||||||
optional bytes client_to_am_token_master_key = 2;
|
optional bytes client_to_am_token_master_key = 2;
|
||||||
repeated ApplicationACLMapProto application_ACLs = 3;
|
repeated ApplicationACLMapProto application_ACLs = 3;
|
||||||
repeated ContainerProto containers_from_previous_attempt = 4;
|
repeated ContainerProto containers_from_previous_attempt = 4;
|
||||||
|
optional string queue = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message FinishApplicationMasterRequestProto {
|
message FinishApplicationMasterRequestProto {
|
||||||
|
@ -252,6 +252,25 @@ public void setContainersFromPreviousAttempt(final List<Container> containers) {
|
|||||||
this.containersFromPreviousAttempt = new ArrayList<Container>();
|
this.containersFromPreviousAttempt = new ArrayList<Container>();
|
||||||
this.containersFromPreviousAttempt.addAll(containers);
|
this.containersFromPreviousAttempt.addAll(containers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getQueue() {
|
||||||
|
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasQueue()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueue(String queue) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (queue == null) {
|
||||||
|
builder.clearQueue();
|
||||||
|
} else {
|
||||||
|
builder.setQueue(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void initRunningContainersList() {
|
private void initRunningContainersList() {
|
||||||
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
@ -267,6 +267,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
|
|||||||
.getMaximumResourceCapability());
|
.getMaximumResourceCapability());
|
||||||
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
||||||
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
||||||
|
response.setQueue(app.getQueue());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
LOG.info("Setting client token master key");
|
LOG.info("Setting client token master key");
|
||||||
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
|
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -36,6 +37,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -150,4 +152,33 @@ public void testInvalidContainerReleaseRequest() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testNotifyAMOfPlacedQueue() throws Exception {
|
||||||
|
// By default, FairScheduler assigns queue by user name
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
try {
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
RMApp app1 = rm.submitApp(1024, "somename", "user1");
|
||||||
|
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
|
||||||
|
RegisterApplicationMasterResponse response = am1.registerAppAttempt();
|
||||||
|
Assert.assertEquals("root.user1", response.getQueue());
|
||||||
|
} finally {
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user