YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh)
This commit is contained in:
parent
e0f4620cc7
commit
5279af7cd4
|
@ -73,7 +73,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
= new DefaultResourceCalculator();
|
= new DefaultResourceCalculator();
|
||||||
|
|
||||||
private long startTime;
|
private long startTime;
|
||||||
private Priority priority;
|
private Priority appPriority;
|
||||||
private ResourceWeights resourceWeights;
|
private ResourceWeights resourceWeights;
|
||||||
private Resource demand = Resources.createResource(0);
|
private Resource demand = Resources.createResource(0);
|
||||||
private FairScheduler scheduler;
|
private FairScheduler scheduler;
|
||||||
|
@ -107,7 +107,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
|
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.startTime = scheduler.getClock().getTime();
|
this.startTime = scheduler.getClock().getTime();
|
||||||
this.priority = Priority.newInstance(1);
|
this.appPriority = Priority.newInstance(1);
|
||||||
this.resourceWeights = new ResourceWeights();
|
this.resourceWeights = new ResourceWeights();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
}
|
}
|
||||||
|
|
||||||
// default level is NODE_LOCAL
|
// default level is NODE_LOCAL
|
||||||
if (! allowedLocalityLevel.containsKey(priority)) {
|
if (!allowedLocalityLevel.containsKey(priority)) {
|
||||||
// add the initial time of priority to prevent comparing with FsApp
|
// add the initial time of priority to prevent comparing with FsApp
|
||||||
// startTime and allowedLocalityLevel degrade
|
// startTime and allowedLocalityLevel degrade
|
||||||
lastScheduledContainer.put(priority, currentTimeMs);
|
lastScheduledContainer.put(priority, currentTimeMs);
|
||||||
|
@ -353,7 +353,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
|
|
||||||
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
|
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
|
||||||
Priority priority, ResourceRequest request,
|
Priority priority, ResourceRequest request,
|
||||||
Container container) {
|
Container reservedContainer) {
|
||||||
// Update allowed locality level
|
// Update allowed locality level
|
||||||
NodeType allowed = allowedLocalityLevel.get(priority);
|
NodeType allowed = allowedLocalityLevel.get(priority);
|
||||||
if (allowed != null) {
|
if (allowed != null) {
|
||||||
|
@ -373,9 +373,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
if (getTotalRequiredResources(priority) <= 0) {
|
if (getTotalRequiredResources(priority) <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Container container = reservedContainer;
|
||||||
|
if (container == null) {
|
||||||
|
container =
|
||||||
|
createContainer(node, request.getCapability(), request.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
// Create RMContainer
|
// Create RMContainer
|
||||||
RMContainer rmContainer = new RMContainerImpl(container,
|
RMContainer rmContainer = new RMContainerImpl(container,
|
||||||
getApplicationAttemptId(), node.getNodeID(),
|
getApplicationAttemptId(), node.getNodeID(),
|
||||||
appSchedulingInfo.getUser(), rmContext);
|
appSchedulingInfo.getUser(), rmContext);
|
||||||
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
|
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
|
||||||
|
@ -485,21 +491,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
* in {@link FSSchedulerNode}..
|
* in {@link FSSchedulerNode}..
|
||||||
* return whether reservation was possible with the current threshold limits
|
* return whether reservation was possible with the current threshold limits
|
||||||
*/
|
*/
|
||||||
private boolean reserve(Priority priority, FSSchedulerNode node,
|
private boolean reserve(ResourceRequest request, FSSchedulerNode node,
|
||||||
Container container, NodeType type, boolean alreadyReserved) {
|
Container reservedContainer, NodeType type) {
|
||||||
|
|
||||||
|
Priority priority = request.getPriority();
|
||||||
if (!reservationExceedsThreshold(node, type)) {
|
if (!reservationExceedsThreshold(node, type)) {
|
||||||
LOG.info("Making reservation: node=" + node.getNodeName() +
|
LOG.info("Making reservation: node=" + node.getNodeName() +
|
||||||
" app_id=" + getApplicationId());
|
" app_id=" + getApplicationId());
|
||||||
if (!alreadyReserved) {
|
if (reservedContainer == null) {
|
||||||
getMetrics().reserveResource(getUser(), container.getResource());
|
reservedContainer =
|
||||||
|
createContainer(node, request.getCapability(),
|
||||||
|
request.getPriority());
|
||||||
|
getMetrics().reserveResource(getUser(),
|
||||||
|
reservedContainer.getResource());
|
||||||
RMContainer rmContainer =
|
RMContainer rmContainer =
|
||||||
super.reserve(node, priority, null, container);
|
super.reserve(node, priority, null, reservedContainer);
|
||||||
node.reserveResource(this, priority, rmContainer);
|
node.reserveResource(this, priority, rmContainer);
|
||||||
setReservation(node);
|
setReservation(node);
|
||||||
} else {
|
} else {
|
||||||
RMContainer rmContainer = node.getReservedContainer();
|
RMContainer rmContainer = node.getReservedContainer();
|
||||||
super.reserve(node, priority, rmContainer, container);
|
super.reserve(node, priority, rmContainer, reservedContainer);
|
||||||
node.reserveResource(this, priority, rmContainer);
|
node.reserveResource(this, priority, rmContainer);
|
||||||
setReservation(node);
|
setReservation(node);
|
||||||
}
|
}
|
||||||
|
@ -615,18 +626,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
// How much does the node have?
|
// How much does the node have?
|
||||||
Resource available = node.getUnallocatedResource();
|
Resource available = node.getUnallocatedResource();
|
||||||
|
|
||||||
Container container = null;
|
Container reservedContainer = null;
|
||||||
if (reserved) {
|
if (reserved) {
|
||||||
container = node.getReservedContainer().getContainer();
|
reservedContainer = node.getReservedContainer().getContainer();
|
||||||
} else {
|
|
||||||
container = createContainer(node, capability, request.getPriority());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Can we allocate a container on this node?
|
// Can we allocate a container on this node?
|
||||||
if (Resources.fitsIn(capability, available)) {
|
if (Resources.fitsIn(capability, available)) {
|
||||||
// Inform the application of the new container for this request
|
// Inform the application of the new container for this request
|
||||||
RMContainer allocatedContainer =
|
RMContainer allocatedContainer =
|
||||||
allocate(type, node, request.getPriority(), request, container);
|
allocate(type, node, request.getPriority(), request,
|
||||||
|
reservedContainer);
|
||||||
if (allocatedContainer == null) {
|
if (allocatedContainer == null) {
|
||||||
// Did the application need this resource?
|
// Did the application need this resource?
|
||||||
if (reserved) {
|
if (reserved) {
|
||||||
|
@ -647,30 +657,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
// the AM. Set the amResource for this app and update the leaf queue's AM
|
// the AM. Set the amResource for this app and update the leaf queue's AM
|
||||||
// usage
|
// usage
|
||||||
if (!isAmRunning() && !getUnmanagedAM()) {
|
if (!isAmRunning() && !getUnmanagedAM()) {
|
||||||
setAMResource(container.getResource());
|
setAMResource(capability);
|
||||||
getQueue().addAMResourceUsage(container.getResource());
|
getQueue().addAMResourceUsage(capability);
|
||||||
setAmRunning(true);
|
setAmRunning(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return container.getResource();
|
return capability;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
if (isReservable(container) &&
|
if (isReservable(capability) &&
|
||||||
reserve(request.getPriority(), node, container, type, reserved)) {
|
reserve(request, node, reservedContainer, type)) {
|
||||||
return FairScheduler.CONTAINER_RESERVED;
|
return FairScheduler.CONTAINER_RESERVED;
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Not creating reservation as container " + container.getId()
|
LOG.debug("Couldn't creating reservation for " +
|
||||||
+ " is not reservable");
|
getName() + ",at priority " + request.getPriority());
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isReservable(Container container) {
|
private boolean isReservable(Resource capacity) {
|
||||||
return scheduler.isAtLeastReservationThreshold(
|
return scheduler.isAtLeastReservationThreshold(
|
||||||
getQueue().getPolicy().getResourceCalculator(), container.getResource());
|
getQueue().getPolicy().getResourceCalculator(), capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasNodeOrRackLocalRequests(Priority priority) {
|
private boolean hasNodeOrRackLocalRequests(Priority priority) {
|
||||||
|
@ -907,7 +917,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
public Priority getPriority() {
|
public Priority getPriority() {
|
||||||
// Right now per-app priorities are not passed to scheduler,
|
// Right now per-app priorities are not passed to scheduler,
|
||||||
// so everyone has the same priority.
|
// so everyone has the same priority.
|
||||||
return priority;
|
return appPriority;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -4480,4 +4480,76 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
||||||
return nm;
|
return nm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testContainerAllocationWithContainerIdLeap() throws Exception {
|
||||||
|
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Add two node
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(3072, 10), 1, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
RMNode node2 = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(3072, 10), 1, "127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
ApplicationAttemptId app1 =
|
||||||
|
createSchedulingRequest(2048, "queue1", "user1", 2);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
|
||||||
|
|
||||||
|
ApplicationAttemptId app2 =
|
||||||
|
createSchedulingRequest(2048, "queue1", "user1", 1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
|
||||||
|
|
||||||
|
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
//container will be reserved at node1
|
||||||
|
RMContainer reservedContainer1 =
|
||||||
|
scheduler.getSchedulerNode(node1.getNodeID()).getReservedContainer();
|
||||||
|
assertNotEquals(reservedContainer1, null);
|
||||||
|
RMContainer reservedContainer2 =
|
||||||
|
scheduler.getSchedulerNode(node2.getNodeID()).getReservedContainer();
|
||||||
|
assertEquals(reservedContainer2, null);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
|
||||||
|
}
|
||||||
|
|
||||||
|
// release resource
|
||||||
|
scheduler.handle(new AppAttemptRemovedSchedulerEvent(
|
||||||
|
app1, RMAppAttemptState.KILLED, false));
|
||||||
|
|
||||||
|
assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// container will be allocated at node2
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
|
||||||
|
assertEquals(scheduler.getSchedulerApp(app2).
|
||||||
|
getLiveContainers().size(), 1);
|
||||||
|
|
||||||
|
long maxId = 0;
|
||||||
|
for (RMContainer container :
|
||||||
|
scheduler.getSchedulerApp(app2).getLiveContainers()) {
|
||||||
|
assertTrue(
|
||||||
|
container.getContainer().getNodeId().equals(node2.getNodeID()));
|
||||||
|
if (container.getContainerId().getContainerId() > maxId) {
|
||||||
|
maxId = container.getContainerId().getContainerId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long reservedId = reservedContainer1.getContainerId().getContainerId();
|
||||||
|
assertEquals(reservedId + 1, maxId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue