YARN-2098. App priority support in Fair Scheduler

Signed-off-by: Wanqiang Ji <jiwq@apache.org>
This commit is contained in:
Wanqiang Ji 2020-09-09 23:05:34 +08:00
parent be50d221f5
commit 09ad0c08d8
No known key found for this signature in database
GPG Key ID: 08FACFFFADA3301B
10 changed files with 40 additions and 34 deletions

View File

@ -464,9 +464,9 @@ public class RMAppImpl implements RMApp, Recoverable {
.newInstance(submissionContext.getPriority().getPriority());
} else {
// If incoming app does not have priority configured in submission
// context, system could be assume that its a 0 priority app and could be
// context, system could be assume that its a 1 priority app and could be
// considered as normal.
this.applicationPriority = Priority.newInstance(0);
this.applicationPriority = Priority.newInstance(1);
}
int globalMaxAppAttempts = conf.getInt(

View File

@ -118,13 +118,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
public FSAppAttempt(FairScheduler scheduler,
ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
ActiveUsersManager activeUsersManager, RMContext rmContext) {
ActiveUsersManager activeUsersManager, RMContext rmContext,
Priority priority) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.scheduler = scheduler;
this.startTime = scheduler.getClock().getTime();
this.lastTimeAtFairShare = this.startTime;
this.appPriority = Priority.newInstance(1);
this.appPriority = priority;
this.enableAMPreemption = scheduler.getConf()
.getAMPreemptionEnabled(getQueue().getQueueName());
}
@ -1335,8 +1336,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
return appPriority;
}

View File

@ -474,7 +474,7 @@ public class FairScheduler extends
*/
protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering,
ApplicationPlacementContext placementContext) {
ApplicationPlacementContext placementContext, Priority appPriority) {
// If the placement was rejected the placementContext will be null.
// We ignore placement rules on recovery.
if (!isAppRecovering && placementContext == null) {
@ -565,7 +565,7 @@ public class FairScheduler extends
&& rmApp.getApplicationSubmissionContext().getUnmanagedAM();
SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<>(queue, user, unmanagedAM);
new SchedulerApplication<>(queue, user, appPriority, unmanagedAM);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user, unmanagedAM);
@ -611,7 +611,8 @@ public class FairScheduler extends
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext);
queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext,
application.getPriority());
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt());
@ -1263,7 +1264,9 @@ public class FairScheduler extends
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering(),
appAddedEvent.getPlacementContext());
appAddedEvent.getPlacementContext(),
appAddedEvent.getApplicatonPriority()
);
}
break;
case APP_REMOVED:

View File

@ -181,7 +181,7 @@ public class FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
placementCtx);
placementCtx, Priority.newInstance(1));
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications()
@ -220,7 +220,7 @@ public class FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
placementCtx);
placementCtx, Priority.newInstance(1));
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications().containsKey(
@ -289,7 +289,7 @@ public class FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queueId);
scheduler.addApplication(id.getApplicationId(), queueId, userId, true,
placementCtx);
placementCtx, Priority.newInstance(1));
return id;
}

View File

@ -133,7 +133,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue11");
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
"user11", false, placementCtx);
"user11", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
@ -174,7 +174,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue11");
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
"user11", false, placementCtx);
"user11", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ResourceRequest request =
@ -373,7 +373,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
false, placementCtx);
false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id11, false, false);
fsAppAttempt = scheduler.getApplicationAttempt(id11);

View File

@ -84,7 +84,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue ,
null, rmContext);
null, rmContext, Priority.newInstance(1));
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
@ -148,7 +148,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue,
null, rmContext);
null, rmContext, Priority.newInstance(1));
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL,
@ -201,7 +201,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue ,
null, rmContext);
null, rmContext, Priority.newInstance(1));
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}
@ -245,7 +245,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
null, rmContext);
null, rmContext, Priority.newInstance(1));
// Min of Memory and CPU across cluster and queue is used in
// DominantResourceFairnessPolicy
@ -311,7 +311,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("default");
scheduler.addApplication(id11.getApplicationId(),
"default", "user1", false, placementCtx);
"default", "user1", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id11, false, false);
assertNotNull(scheduler.getSchedulerApplications().get(id11.
getApplicationId()));
@ -378,7 +378,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(conf);
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue,
null, rmContext);
null, rmContext, Priority.newInstance(1));
schedulerApp.setAmRunning(false);
FSSchedulerNode schedulerNode = Mockito.mock(FSSchedulerNode.class);

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
@ -164,7 +165,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
RMContext rmContext = resourceManager.getRMContext();
final FSAppAttempt app =
new FSAppAttempt(scheduler, applicationAttemptId, "user1",
schedulable, null, rmContext);
schedulable, null, rmContext, Priority.newInstance(1));
// this needs to be in sync with the number of runnables declared below
int testThreads = 2;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
@ -431,7 +432,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
new ApplicationPlacementContext(queueName);
scheduler.addApplication(id11.getApplicationId(), queueName, "user1",
false, placementCtx);
false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 =
@ -1384,7 +1385,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false, placementCtx);
false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(attemptId, false, false);
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
@ -2072,7 +2073,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(),
"root.queue1", "user1", false, placementCtx);
"root.queue1", "user1", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = createResourceRequest(minReqSize * 2,
@ -2086,7 +2087,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createMockRMApp(id21);
placementCtx = new ApplicationPlacementContext("root.queue2");
scheduler.addApplication(id21.getApplicationId(),
"root.queue2", "user1", false, placementCtx);
"root.queue2", "user1", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id21, false, false);
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize,
@ -2102,7 +2103,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(),
"root.queue2", "user1", false, placementCtx);
"root.queue2", "user1", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id22, false, false);
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(minReqSize,
@ -2671,7 +2672,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false, placementCtx);
false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(attemptId, false, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
@ -3008,7 +3009,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext(queue);
scheduler.addApplication(attId.getApplicationId(), queue, user, false,
placementCtx);
placementCtx, Priority.newInstance(1));
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@ -4324,7 +4325,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
false, placementCtx);
false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<>();
@ -5322,7 +5323,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationPlacementContext placementCtx =
new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
"user1", false, placementCtx);
"user1", false, placementCtx, Priority.newInstance(1));
scheduler.addApplicationAttempt(appAttemptId, false, false);
// Create container request that goes to a specific node.

View File

@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -74,7 +75,7 @@ public class TestMaxRunningAppsEnforcer {
ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null,
rmContext);
rmContext, Priority.newInstance(1));
boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, app);
queue.addApp(app, runnable);
if (runnable) {

View File

@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@ -577,7 +578,7 @@ public class TestQueueManager {
// the appAttempt is created
// removeEmptyDynamicQueues() should not remove the queue
FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId,
"a_user", q, activeUsersManager, rmContext);
"a_user", q, activeUsersManager, rmContext, Priority.newInstance(1));
q.addApp(appAttempt, true);
queueManager.removeEmptyDynamicQueues();
q = queueManager.getLeafQueue("root.leaf1", false);