YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh)

(cherry picked from commit 5279af7cd4)
This commit is contained in:
Arun Suresh 2016-06-10 22:33:42 -07:00
parent efdfa68911
commit 5985221b46
2 changed files with 108 additions and 26 deletions

View File

@ -73,7 +73,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
= new DefaultResourceCalculator(); = new DefaultResourceCalculator();
private long startTime; private long startTime;
private Priority priority; private Priority appPriority;
private ResourceWeights resourceWeights; private ResourceWeights resourceWeights;
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
private FairScheduler scheduler; private FairScheduler scheduler;
@ -107,7 +107,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
this.scheduler = scheduler; this.scheduler = scheduler;
this.startTime = scheduler.getClock().getTime(); this.startTime = scheduler.getClock().getTime();
this.priority = Priority.newInstance(1); this.appPriority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights(); this.resourceWeights = new ResourceWeights();
} }
@ -309,7 +309,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} }
// default level is NODE_LOCAL // default level is NODE_LOCAL
if (! allowedLocalityLevel.containsKey(priority)) { if (!allowedLocalityLevel.containsKey(priority)) {
// add the initial time of priority to prevent comparing with FsApp // add the initial time of priority to prevent comparing with FsApp
// startTime and allowedLocalityLevel degrade // startTime and allowedLocalityLevel degrade
lastScheduledContainer.put(priority, currentTimeMs); lastScheduledContainer.put(priority, currentTimeMs);
@ -353,7 +353,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request, Priority priority, ResourceRequest request,
Container container) { Container reservedContainer) {
// Update allowed locality level // Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(priority); NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) { if (allowed != null) {
@ -374,6 +374,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return null; return null;
} }
Container container = reservedContainer;
if (container == null) {
container =
createContainer(node, request.getCapability(), request.getPriority());
}
// Create RMContainer // Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(), getApplicationAttemptId(), node.getNodeID(),
@ -485,21 +491,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* in {@link FSSchedulerNode}.. * in {@link FSSchedulerNode}..
* return whether reservation was possible with the current threshold limits * return whether reservation was possible with the current threshold limits
*/ */
private boolean reserve(Priority priority, FSSchedulerNode node, private boolean reserve(ResourceRequest request, FSSchedulerNode node,
Container container, NodeType type, boolean alreadyReserved) { Container reservedContainer, NodeType type) {
Priority priority = request.getPriority();
if (!reservationExceedsThreshold(node, type)) { if (!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() + LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId()); " app_id=" + getApplicationId());
if (!alreadyReserved) { if (reservedContainer == null) {
getMetrics().reserveResource(getUser(), container.getResource()); reservedContainer =
createContainer(node, request.getCapability(),
request.getPriority());
getMetrics().reserveResource(getUser(),
reservedContainer.getResource());
RMContainer rmContainer = RMContainer rmContainer =
super.reserve(node, priority, null, container); super.reserve(node, priority, null, reservedContainer);
node.reserveResource(this, priority, rmContainer); node.reserveResource(this, priority, rmContainer);
setReservation(node); setReservation(node);
} else { } else {
RMContainer rmContainer = node.getReservedContainer(); RMContainer rmContainer = node.getReservedContainer();
super.reserve(node, priority, rmContainer, container); super.reserve(node, priority, rmContainer, reservedContainer);
node.reserveResource(this, priority, rmContainer); node.reserveResource(this, priority, rmContainer);
setReservation(node); setReservation(node);
} }
@ -615,18 +626,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// How much does the node have? // How much does the node have?
Resource available = node.getUnallocatedResource(); Resource available = node.getUnallocatedResource();
Container container = null; Container reservedContainer = null;
if (reserved) { if (reserved) {
container = node.getReservedContainer().getContainer(); reservedContainer = node.getReservedContainer().getContainer();
} else {
container = createContainer(node, capability, request.getPriority());
} }
// Can we allocate a container on this node? // Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) { if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request // Inform the application of the new container for this request
RMContainer allocatedContainer = RMContainer allocatedContainer =
allocate(type, node, request.getPriority(), request, container); allocate(type, node, request.getPriority(), request,
reservedContainer);
if (allocatedContainer == null) { if (allocatedContainer == null) {
// Did the application need this resource? // Did the application need this resource?
if (reserved) { if (reserved) {
@ -647,30 +657,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// the AM. Set the amResource for this app and update the leaf queue's AM // the AM. Set the amResource for this app and update the leaf queue's AM
// usage // usage
if (!isAmRunning() && !getUnmanagedAM()) { if (!isAmRunning() && !getUnmanagedAM()) {
setAMResource(container.getResource()); setAMResource(capability);
getQueue().addAMResourceUsage(container.getResource()); getQueue().addAMResourceUsage(capability);
setAmRunning(true); setAmRunning(true);
} }
return container.getResource(); return capability;
} }
// The desired container won't fit here, so reserve // The desired container won't fit here, so reserve
if (isReservable(container) && if (isReservable(capability) &&
reserve(request.getPriority(), node, container, type, reserved)) { reserve(request, node, reservedContainer, type)) {
return FairScheduler.CONTAINER_RESERVED; return FairScheduler.CONTAINER_RESERVED;
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Not creating reservation as container " + container.getId() LOG.debug("Couldn't creating reservation for " +
+ " is not reservable"); getName() + ",at priority " + request.getPriority());
} }
return Resources.none(); return Resources.none();
} }
} }
private boolean isReservable(Container container) { private boolean isReservable(Resource capacity) {
return scheduler.isAtLeastReservationThreshold( return scheduler.isAtLeastReservationThreshold(
getQueue().getPolicy().getResourceCalculator(), container.getResource()); getQueue().getPolicy().getResourceCalculator(), capacity);
} }
private boolean hasNodeOrRackLocalRequests(Priority priority) { private boolean hasNodeOrRackLocalRequests(Priority priority) {
@ -907,7 +917,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
public Priority getPriority() { public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler, // Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority. // so everyone has the same priority.
return priority; return appPriority;
} }
@Override @Override

View File

@ -4480,4 +4480,76 @@ public class TestFairScheduler extends FairSchedulerTestBase {
resourceManager.getResourceScheduler().handle(nodeAddEvent1); resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm; return nm;
} }
@Test(timeout = 120000)
public void testContainerAllocationWithContainerIdLeap() throws Exception {
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add two node
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(3072, 10), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1,
Resources.createResource(3072, 10), 1, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId app1 =
createSchedulingRequest(2048, "queue1", "user1", 2);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
ApplicationAttemptId app2 =
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
//container will be reserved at node1
RMContainer reservedContainer1 =
scheduler.getSchedulerNode(node1.getNodeID()).getReservedContainer();
assertNotEquals(reservedContainer1, null);
RMContainer reservedContainer2 =
scheduler.getSchedulerNode(node2.getNodeID()).getReservedContainer();
assertEquals(reservedContainer2, null);
for (int i = 0; i < 10; i++) {
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
}
// release resource
scheduler.handle(new AppAttemptRemovedSchedulerEvent(
app1, RMAppAttemptState.KILLED, false));
assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
// container will be allocated at node2
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(scheduler.getSchedulerApp(app2).
getLiveContainers().size(), 1);
long maxId = 0;
for (RMContainer container :
scheduler.getSchedulerApp(app2).getLiveContainers()) {
assertTrue(
container.getContainer().getNodeId().equals(node2.getNodeID()));
if (container.getContainerId().getContainerId() > maxId) {
maxId = container.getContainerId().getContainerId();
}
}
long reservedId = reservedContainer1.getContainerId().getContainerId();
assertEquals(reservedId + 1, maxId);
}
} }