YARN-333. Schedulers cannot control the queue-name of an application. (sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1502374 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8767e4cde1
commit
1714932158
|
@ -751,6 +751,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
YARN-799. Fix CgroupsLCEResourcesHandler to use /tasks instead of
|
YARN-799. Fix CgroupsLCEResourcesHandler to use /tasks instead of
|
||||||
/cgroup.procs. (Chris Riccomini via acmurthy)
|
/cgroup.procs. (Chris Riccomini via acmurthy)
|
||||||
|
|
||||||
|
YARN-333. Schedulers cannot control the queue-name of an
|
||||||
|
application. (sandyr via tucu)
|
||||||
|
|
||||||
Release 2.0.5-alpha - 06/06/2013
|
Release 2.0.5-alpha - 06/06/2013
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The read interface to an Application in the ResourceManager. Take a
|
* The interface to an Application in the ResourceManager. Take a
|
||||||
* look at {@link RMAppImpl} for its implementation. This interface
|
* look at {@link RMAppImpl} for its implementation. This interface
|
||||||
* exposes methods to access various updates in application status/report.
|
* exposes methods to access various updates in application status/report.
|
||||||
*/
|
*/
|
||||||
|
@ -86,6 +86,13 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
||||||
*/
|
*/
|
||||||
String getQueue();
|
String getQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reflects a change in the application's queue from the one specified in the
|
||||||
|
* {@link ApplicationSubmissionContext}.
|
||||||
|
* @param name the new queue name
|
||||||
|
*/
|
||||||
|
void setQueue(String name);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The name of the application as set in {@link
|
* The name of the application as set in {@link
|
||||||
* ApplicationSubmissionContext#setApplicationName(String)}.
|
* ApplicationSubmissionContext#setApplicationName(String)}.
|
||||||
|
|
|
@ -81,7 +81,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final String user;
|
private final String user;
|
||||||
private final String queue;
|
|
||||||
private final String name;
|
private final String name;
|
||||||
private final ApplicationSubmissionContext submissionContext;
|
private final ApplicationSubmissionContext submissionContext;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
@ -101,6 +100,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private long startTime;
|
private long startTime;
|
||||||
private long finishTime;
|
private long finishTime;
|
||||||
private RMAppAttempt currentAttempt;
|
private RMAppAttempt currentAttempt;
|
||||||
|
private String queue;
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
private EventHandler handler;
|
private EventHandler handler;
|
||||||
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
||||||
|
@ -342,6 +342,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
return this.queue;
|
return this.queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueue(String queue) {
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return this.name;
|
return this.name;
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -88,6 +89,8 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A scheduler that schedules resources between a set of queues. The scheduler
|
* A scheduler that schedules resources between a set of queues. The scheduler
|
||||||
* keeps track of the resources used by each queue, and attempts to maintain
|
* keeps track of the resources used by each queue, and attempts to maintain
|
||||||
|
@ -601,12 +604,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
*/
|
*/
|
||||||
protected synchronized void addApplication(
|
protected synchronized void addApplication(
|
||||||
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
||||||
|
RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId);
|
||||||
FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
|
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
||||||
if (queue == null) {
|
|
||||||
// queue is not an existing or createable leaf queue
|
|
||||||
queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
FSSchedulerApp schedulerApp =
|
FSSchedulerApp schedulerApp =
|
||||||
new FSSchedulerApp(applicationAttemptId, user,
|
new FSSchedulerApp(applicationAttemptId, user,
|
||||||
|
@ -638,6 +637,27 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
RMAppAttemptEventType.APP_ACCEPTED));
|
RMAppAttemptEventType.APP_ACCEPTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
||||||
|
// Potentially set queue to username if configured to do so
|
||||||
|
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
|
||||||
|
userAsDefaultQueue) {
|
||||||
|
queueName = user;
|
||||||
|
}
|
||||||
|
|
||||||
|
FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
|
||||||
|
if (queue == null) {
|
||||||
|
// queue is not an existing or createable leaf queue
|
||||||
|
queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rmApp != null) {
|
||||||
|
rmApp.setQueue(queue.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void removeApplication(
|
private synchronized void removeApplication(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState) {
|
RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
|
@ -985,13 +1005,6 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
}
|
}
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
||||||
String queue = appAddedEvent.getQueue();
|
String queue = appAddedEvent.getQueue();
|
||||||
|
|
||||||
// Potentially set queue to username if configured to do so
|
|
||||||
String def = YarnConfiguration.DEFAULT_QUEUE_NAME;
|
|
||||||
if (queue.equals(def) && userAsDefaultQueue) {
|
|
||||||
queue = appAddedEvent.getUser();
|
|
||||||
}
|
|
||||||
|
|
||||||
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
||||||
appAddedEvent.getUser());
|
appAddedEvent.getUser());
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -136,6 +136,11 @@ public abstract class MockAsm extends MockApps {
|
||||||
public String getApplicationType() {
|
public String getApplicationType() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueue(String name) {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RMApp newApplication(int i) {
|
public static RMApp newApplication(int i) {
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -393,7 +394,6 @@ public class TestFairScheduler {
|
||||||
scheduler.handle(nodeEvent1);
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
// Have two queues which want entire cluster capacity
|
// Have two queues which want entire cluster capacity
|
||||||
createSchedulingRequest(10 * 1024, "default", "user1");
|
|
||||||
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
|
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
|
||||||
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
|
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
|
||||||
|
|
||||||
|
@ -559,6 +559,25 @@ public class TestFairScheduler {
|
||||||
.getAppSchedulables().size());
|
.getAppSchedulables().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssignToQueue() throws Exception {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
|
||||||
|
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
|
||||||
|
|
||||||
|
FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix");
|
||||||
|
FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix");
|
||||||
|
|
||||||
|
// assert FSLeafQueue's name is the correct name is the one set in the RMApp
|
||||||
|
assertEquals(rmApp1.getQueue(), queue1.getName());
|
||||||
|
assertEquals("root.asterix", rmApp1.getQueue());
|
||||||
|
assertEquals(rmApp2.getQueue(), queue2.getName());
|
||||||
|
assertEquals("root.notdefault", rmApp2.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFairShareWithMinAlloc() throws Exception {
|
public void testFairShareWithMinAlloc() throws Exception {
|
||||||
Configuration conf = createConfiguration();
|
Configuration conf = createConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue