YARN-3415. Non-AM containers can be counted towards amResourceUsage of a fairscheduler queue (Zhihai Xu via Sandy Ryza)
This commit is contained in:
parent
cacadea632
commit
6286cfd711
|
@ -84,6 +84,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3425. NPE from RMNodeLabelsManager.serviceStop when
|
YARN-3425. NPE from RMNodeLabelsManager.serviceStop when
|
||||||
NodeLabelsManager.serviceInit failed. (Bibin A Chundatt via wangda)
|
NodeLabelsManager.serviceInit failed. (Bibin A Chundatt via wangda)
|
||||||
|
|
||||||
|
YARN-3415. Non-AM containers can be counted towards amResourceUsage of a
|
||||||
|
Fair Scheduler queue (Zhihai Xu via Sandy Ryza)
|
||||||
|
|
||||||
Release 2.7.0 - UNRELEASED
|
Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -523,8 +523,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
// Inform the node
|
// Inform the node
|
||||||
node.allocateContainer(allocatedContainer);
|
node.allocateContainer(allocatedContainer);
|
||||||
|
|
||||||
// If this container is used to run AM, update the leaf queue's AM usage
|
// If not running unmanaged, the first container we allocate is always
|
||||||
if (getLiveContainers().size() == 1 && !getUnmanagedAM()) {
|
// the AM. Set the amResource for this app and update the leaf queue's AM
|
||||||
|
// usage
|
||||||
|
if (!isAmRunning() && !getUnmanagedAM()) {
|
||||||
|
setAMResource(container.getResource());
|
||||||
getQueue().addAMResourceUsage(container.getResource());
|
getQueue().addAMResourceUsage(container.getResource());
|
||||||
setAmRunning(true);
|
setAmRunning(true);
|
||||||
}
|
}
|
||||||
|
@ -551,6 +554,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
|
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the AM resource usage for the leaf queue
|
||||||
|
if (!isAmRunning() && !getUnmanagedAM()) {
|
||||||
|
List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
|
||||||
|
if (ask.isEmpty() || !getQueue().canRunAppAM(
|
||||||
|
ask.get(0).getCapability())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
||||||
|
"be exceeded");
|
||||||
|
}
|
||||||
|
return Resources.none();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Collection<Priority> prioritiesToTry = (reserved) ?
|
Collection<Priority> prioritiesToTry = (reserved) ?
|
||||||
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
|
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
|
||||||
getPriorities();
|
getPriorities();
|
||||||
|
@ -567,17 +583,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
|
|
||||||
addSchedulingOpportunity(priority);
|
addSchedulingOpportunity(priority);
|
||||||
|
|
||||||
// Check the AM resource usage for the leaf queue
|
|
||||||
if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
|
|
||||||
if (!getQueue().canRunAppAM(getAMResource())) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
|
||||||
"be exceeded");
|
|
||||||
}
|
|
||||||
return Resources.none();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ResourceRequest rackLocalRequest = getResourceRequest(priority,
|
ResourceRequest rackLocalRequest = getResourceRequest(priority,
|
||||||
node.getRackName());
|
node.getRackName());
|
||||||
ResourceRequest localRequest = getResourceRequest(priority,
|
ResourceRequest localRequest = getResourceRequest(priority,
|
||||||
|
|
|
@ -124,8 +124,9 @@ public class FSLeafQueue extends FSQueue {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update AM resource usage if needed
|
// Update AM resource usage if needed. If isAMRunning is true, we're not
|
||||||
if (runnable && app.isAmRunning() && app.getAMResource() != null) {
|
// running an unmanaged AM.
|
||||||
|
if (runnable && app.isAmRunning()) {
|
||||||
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -901,12 +901,6 @@ public class FairScheduler extends
|
||||||
// Record container allocation start time
|
// Record container allocation start time
|
||||||
application.recordContainerRequestTime(getClock().getTime());
|
application.recordContainerRequestTime(getClock().getTime());
|
||||||
|
|
||||||
// Set amResource for this app
|
|
||||||
if (!application.getUnmanagedAM() && ask.size() == 1
|
|
||||||
&& application.getLiveContainers().isEmpty()) {
|
|
||||||
application.setAMResource(ask.get(0).getCapability());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release containers
|
// Release containers
|
||||||
releaseContainers(release, application);
|
releaseContainers(release, application);
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -3548,8 +3549,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
|
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application3's AM requests 1024 MB memory",
|
assertEquals("Application3's AM resource shouldn't be updated",
|
||||||
1024, app3.getAMResource().getMemory());
|
0, app3.getAMResource().getMemory());
|
||||||
assertEquals("Application3's AM should not be running",
|
assertEquals("Application3's AM should not be running",
|
||||||
0, app3.getLiveContainers().size());
|
0, app3.getLiveContainers().size());
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
@ -3574,6 +3575,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
0, app1.getLiveContainers().size());
|
0, app1.getLiveContainers().size());
|
||||||
assertEquals("Application3's AM should be running",
|
assertEquals("Application3's AM should be running",
|
||||||
1, app3.getLiveContainers().size());
|
1, app3.getLiveContainers().size());
|
||||||
|
assertEquals("Application3's AM requests 1024 MB memory",
|
||||||
|
1024, app3.getAMResource().getMemory());
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
2048, queue1.getAmResourceUsage().getMemory());
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
@ -3584,8 +3587,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
|
FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application4's AM requests 2048 MB memory",
|
assertEquals("Application4's AM resource shouldn't be updated",
|
||||||
2048, app4.getAMResource().getMemory());
|
0, app4.getAMResource().getMemory());
|
||||||
assertEquals("Application4's AM should not be running",
|
assertEquals("Application4's AM should not be running",
|
||||||
0, app4.getLiveContainers().size());
|
0, app4.getLiveContainers().size());
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
@ -3598,8 +3601,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
|
FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application5's AM requests 2048 MB memory",
|
assertEquals("Application5's AM resource shouldn't be updated",
|
||||||
2048, app5.getAMResource().getMemory());
|
0, app5.getAMResource().getMemory());
|
||||||
assertEquals("Application5's AM should not be running",
|
assertEquals("Application5's AM should not be running",
|
||||||
0, app5.getLiveContainers().size());
|
0, app5.getLiveContainers().size());
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
@ -3631,6 +3634,33 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
0, app3.getLiveContainers().size());
|
0, app3.getLiveContainers().size());
|
||||||
assertEquals("Application5's AM should be running",
|
assertEquals("Application5's AM should be running",
|
||||||
1, app5.getLiveContainers().size());
|
1, app5.getLiveContainers().size());
|
||||||
|
assertEquals("Application5's AM requests 2048 MB memory",
|
||||||
|
2048, app5.getAMResource().getMemory());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// request non-AM container for app5
|
||||||
|
createSchedulingRequestExistingApplication(1024, 1, attId5);
|
||||||
|
assertEquals("Application5's AM should have 1 container",
|
||||||
|
1, app5.getLiveContainers().size());
|
||||||
|
// complete AM container before non-AM container is allocated.
|
||||||
|
// spark application hit this situation.
|
||||||
|
RMContainer amContainer5 = (RMContainer)app5.getLiveContainers().toArray()[0];
|
||||||
|
ContainerExpiredSchedulerEvent containerExpired =
|
||||||
|
new ContainerExpiredSchedulerEvent(amContainer5.getContainerId());
|
||||||
|
scheduler.handle(containerExpired);
|
||||||
|
assertEquals("Application5's AM should have 0 container",
|
||||||
|
0, app5.getLiveContainers().size());
|
||||||
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
// non-AM container should be allocated
|
||||||
|
// check non-AM container allocation is not rejected
|
||||||
|
// due to queue MaxAMShare limitation.
|
||||||
|
assertEquals("Application5 should have 1 container",
|
||||||
|
1, app5.getLiveContainers().size());
|
||||||
|
// check non-AM container allocation won't affect queue AmResourceUsage
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
2048, queue1.getAmResourceUsage().getMemory());
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
@ -3643,8 +3673,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application6's AM should not be running",
|
assertEquals("Application6's AM should not be running",
|
||||||
0, app6.getLiveContainers().size());
|
0, app6.getLiveContainers().size());
|
||||||
assertEquals("Application6's AM requests 2048 MB memory",
|
assertEquals("Application6's AM resource shouldn't be updated",
|
||||||
2048, app6.getAMResource().getMemory());
|
0, app6.getAMResource().getMemory());
|
||||||
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
|
||||||
2048, queue1.getAmResourceUsage().getMemory());
|
2048, queue1.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
@ -3748,8 +3778,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
|
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
assertEquals("Application2's AM requests 1024 MB memory",
|
assertEquals("Application2's AM resource shouldn't be updated",
|
||||||
1024, app2.getAMResource().getMemory());
|
0, app2.getAMResource().getMemory());
|
||||||
assertEquals("Application2's AM should not be running",
|
assertEquals("Application2's AM should not be running",
|
||||||
0, app2.getLiveContainers().size());
|
0, app2.getLiveContainers().size());
|
||||||
assertEquals("Queue2's AM resource usage should be 0 MB memory",
|
assertEquals("Queue2's AM resource usage should be 0 MB memory",
|
||||||
|
|
Loading…
Reference in New Issue