YARN-2128. FairScheduler: Incorrect calculation of amResource usage. (Wei Yan via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1601050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
969bef3c77
commit
85d4c787e0
|
@ -208,6 +208,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
YARN-2121. Fixed NPE handling in Timeline Server's TimelineAuthenticator.
|
YARN-2121. Fixed NPE handling in Timeline Server's TimelineAuthenticator.
|
||||||
(Zhijie Shen via vinodkv)
|
(Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-2128. FairScheduler: Incorrect calculation of amResource usage.
|
||||||
|
(Wei Yan via kasha)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -79,6 +79,7 @@ public class SchedulerApplicationAttempt {
|
||||||
protected Resource currentConsumption = Resource.newInstance(0, 0);
|
protected Resource currentConsumption = Resource.newInstance(0, 0);
|
||||||
private Resource amResource;
|
private Resource amResource;
|
||||||
private boolean unmanagedAM = true;
|
private boolean unmanagedAM = true;
|
||||||
|
private boolean amRunning = false;
|
||||||
|
|
||||||
protected List<RMContainer> newlyAllocatedContainers =
|
protected List<RMContainer> newlyAllocatedContainers =
|
||||||
new ArrayList<RMContainer>();
|
new ArrayList<RMContainer>();
|
||||||
|
@ -110,7 +111,6 @@ public class SchedulerApplicationAttempt {
|
||||||
activeUsersManager);
|
activeUsersManager);
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
|
|
||||||
|
|
||||||
if (rmContext != null && rmContext.getRMApps() != null &&
|
if (rmContext != null && rmContext.getRMApps() != null &&
|
||||||
rmContext.getRMApps()
|
rmContext.getRMApps()
|
||||||
.containsKey(applicationAttemptId.getApplicationId())) {
|
.containsKey(applicationAttemptId.getApplicationId())) {
|
||||||
|
@ -118,7 +118,6 @@ public class SchedulerApplicationAttempt {
|
||||||
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
|
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
|
||||||
.getApplicationSubmissionContext();
|
.getApplicationSubmissionContext();
|
||||||
if (appSubmissionContext != null) {
|
if (appSubmissionContext != null) {
|
||||||
amResource = appSubmissionContext.getResource();
|
|
||||||
unmanagedAM = appSubmissionContext.getUnmanagedAM();
|
unmanagedAM = appSubmissionContext.getUnmanagedAM();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,6 +187,18 @@ public class SchedulerApplicationAttempt {
|
||||||
return amResource;
|
return amResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAMResource(Resource amResource) {
|
||||||
|
this.amResource = amResource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAmRunning() {
|
||||||
|
return amRunning;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAmRunning(boolean bool) {
|
||||||
|
amRunning = bool;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean getUnmanagedAM() {
|
public boolean getUnmanagedAM() {
|
||||||
return unmanagedAM;
|
return unmanagedAM;
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,6 +271,7 @@ public class AppSchedulable extends Schedulable {
|
||||||
if (app.getLiveContainers().size() == 1 &&
|
if (app.getLiveContainers().size() == 1 &&
|
||||||
!app.getUnmanagedAM()) {
|
!app.getUnmanagedAM()) {
|
||||||
queue.addAMResourceUsage(container.getResource());
|
queue.addAMResourceUsage(container.getResource());
|
||||||
|
app.setAmRunning(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return container.getResource();
|
return container.getResource();
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class FSLeafQueue extends FSQueue {
|
||||||
public boolean removeApp(FSSchedulerApp app) {
|
public boolean removeApp(FSSchedulerApp app) {
|
||||||
if (runnableAppScheds.remove(app.getAppSchedulable())) {
|
if (runnableAppScheds.remove(app.getAppSchedulable())) {
|
||||||
// Update AM resource usage
|
// Update AM resource usage
|
||||||
if (app.getAMResource() != null) {
|
if (app.isAmRunning() && app.getAMResource() != null) {
|
||||||
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -153,6 +153,10 @@ public class FSLeafQueue extends FSQueue {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getAmResourceUsage() {
|
||||||
|
return amResourceUsage;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDemand() {
|
public void updateDemand() {
|
||||||
// Compute demand by iterating through apps in the queue
|
// Compute demand by iterating through apps in the queue
|
||||||
|
|
|
@ -836,6 +836,12 @@ public class FairScheduler extends
|
||||||
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
||||||
clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
|
clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
|
||||||
|
|
||||||
|
// Set amResource for this app
|
||||||
|
if (!application.getUnmanagedAM() && ask.size() == 1
|
||||||
|
&& application.getLiveContainers().isEmpty()) {
|
||||||
|
application.setAMResource(ask.get(0).getCapability());
|
||||||
|
}
|
||||||
|
|
||||||
// Release containers
|
// Release containers
|
||||||
for (ContainerId releasedContainerId : release) {
|
for (ContainerId releasedContainerId : release) {
|
||||||
RMContainer rmContainer = getRMContainer(releasedContainerId);
|
RMContainer rmContainer = getRMContainer(releasedContainerId);
|
||||||
|
|
|
@ -2328,12 +2328,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.handle(nodeEvent);
|
scheduler.handle(nodeEvent);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
|
|
||||||
|
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
|
||||||
assertEquals("Queue queue1's fair share should be 10240",
|
assertEquals("Queue queue1's fair share should be 10240",
|
||||||
10240, scheduler.getQueueManager().getLeafQueue("queue1", true)
|
10240, queue1.getFairShare().getMemory());
|
||||||
.getFairShare().getMemory());
|
|
||||||
|
|
||||||
Resource amResource1 = Resource.newInstance(1024, 1);
|
Resource amResource1 = Resource.newInstance(1024, 1);
|
||||||
Resource amResource2 = Resource.newInstance(2048, 2);
|
Resource amResource2 = Resource.newInstance(2048, 2);
|
||||||
|
Resource amResource3 = Resource.newInstance(1860, 2);
|
||||||
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
|
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
|
||||||
// Exceeds no limits
|
// Exceeds no limits
|
||||||
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
|
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
|
||||||
|
@ -2346,6 +2347,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
1024, app1.getAMResource().getMemory());
|
1024, app1.getAMResource().getMemory());
|
||||||
assertEquals("Application1's AM should be running",
|
assertEquals("Application1's AM should be running",
|
||||||
1, app1.getLiveContainers().size());
|
1, app1.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 1024 MB memory",
|
||||||
|
1024, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Exceeds no limits
|
// Exceeds no limits
|
||||||
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
|
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
|
||||||
|
@ -2358,6 +2361,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
1024, app2.getAMResource().getMemory());
|
1024, app2.getAMResource().getMemory());
|
||||||
assertEquals("Application2's AM should be running",
|
assertEquals("Application2's AM should be running",
|
||||||
1, app2.getLiveContainers().size());
|
1, app2.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Exceeds queue limit
|
// Exceeds queue limit
|
||||||
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
|
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
|
||||||
|
@ -2370,6 +2375,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
1024, app3.getAMResource().getMemory());
|
1024, app3.getAMResource().getMemory());
|
||||||
assertEquals("Application3's AM should not be running",
|
assertEquals("Application3's AM should not be running",
|
||||||
0, app3.getLiveContainers().size());
|
0, app3.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Still can run non-AM container
|
// Still can run non-AM container
|
||||||
createSchedulingRequestExistingApplication(1024, 1, attId1);
|
createSchedulingRequestExistingApplication(1024, 1, attId1);
|
||||||
|
@ -2377,6 +2384,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application1 should have two running containers",
|
assertEquals("Application1 should have two running containers",
|
||||||
2, app1.getLiveContainers().size());
|
2, app1.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Remove app1, app3's AM should become running
|
// Remove app1, app3's AM should become running
|
||||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
||||||
|
@ -2388,6 +2397,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
0, app1.getLiveContainers().size());
|
0, app1.getLiveContainers().size());
|
||||||
assertEquals("Application3's AM should be running",
|
assertEquals("Application3's AM should be running",
|
||||||
1, app3.getLiveContainers().size());
|
1, app3.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Exceeds queue limit
|
// Exceeds queue limit
|
||||||
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
|
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
|
||||||
|
@ -2400,8 +2411,35 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
2048, app4.getAMResource().getMemory());
|
2048, app4.getAMResource().getMemory());
|
||||||
assertEquals("Application4's AM should not be running",
|
assertEquals("Application4's AM should not be running",
|
||||||
0, app4.getLiveContainers().size());
|
0, app4.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
// Remove app2 and app3, app4's AM should become running
|
// Exceeds queue limit
|
||||||
|
ApplicationAttemptId attId5 = createAppAttemptId(5, 1);
|
||||||
|
createApplicationWithAMResource(attId5, "queue1", "user1", amResource2);
|
||||||
|
createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5);
|
||||||
|
FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application5's AM requests 2048 MB memory",
|
||||||
|
2048, app5.getAMResource().getMemory());
|
||||||
|
assertEquals("Application5's AM should not be running",
|
||||||
|
0, app5.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Remove un-running app doesn't affect others
|
||||||
|
AppAttemptRemovedSchedulerEvent appRemovedEvent4 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.KILLED, false);
|
||||||
|
scheduler.handle(appRemovedEvent4);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application5's AM should not be running",
|
||||||
|
0, app5.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Remove app2 and app3, app5's AM should become running
|
||||||
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
|
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
|
||||||
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
|
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
|
||||||
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
|
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
|
||||||
|
@ -2414,8 +2452,35 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
0, app2.getLiveContainers().size());
|
0, app2.getLiveContainers().size());
|
||||||
assertEquals("Application3's AM should be finished",
|
assertEquals("Application3's AM should be finished",
|
||||||
0, app3.getLiveContainers().size());
|
0, app3.getLiveContainers().size());
|
||||||
assertEquals("Application4's AM should be running",
|
assertEquals("Application5's AM should be running",
|
||||||
1, app4.getLiveContainers().size());
|
1, app5.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Check amResource normalization
|
||||||
|
ApplicationAttemptId attId6 = createAppAttemptId(6, 1);
|
||||||
|
createApplicationWithAMResource(attId6, "queue1", "user1", amResource3);
|
||||||
|
createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6);
|
||||||
|
FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application6's AM should not be running",
|
||||||
|
0, app6.getLiveContainers().size());
|
||||||
|
assertEquals("Application6's AM requests 2048 MB memory",
|
||||||
|
2048, app6.getAMResource().getMemory());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Remove all apps
|
||||||
|
AppAttemptRemovedSchedulerEvent appRemovedEvent5 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.FINISHED, false);
|
||||||
|
AppAttemptRemovedSchedulerEvent appRemovedEvent6 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(attId6, RMAppAttemptState.FINISHED, false);
|
||||||
|
scheduler.handle(appRemovedEvent5);
|
||||||
|
scheduler.handle(appRemovedEvent6);
|
||||||
|
scheduler.update();
|
||||||
|
assertEquals("Queue1's AM resource usage should be 0",
|
||||||
|
0, queue1.getAmResourceUsage().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue