YARN-6210. FairScheduler: Node reservations can interfere with preemption. (kasha)

(cherry picked from commit 718ad9f6ee)
This commit is contained in:
Karthik Kambatla 2017-02-22 15:45:45 -08:00
parent 4c883f331c
commit ca7a6a7365
8 changed files with 178 additions and 108 deletions

View File

@ -30,7 +30,8 @@ public class DefaultResourceCalculator extends ResourceCalculator {
LogFactory.getLog(DefaultResourceCalculator.class);
@Override
public int compare(Resource unused, Resource lhs, Resource rhs) {
public int compare(Resource unused, Resource lhs, Resource rhs,
boolean singleType) {
// Only consider memory
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
}

View File

@ -51,17 +51,18 @@ public class DominantResourceCalculator extends ResourceCalculator {
LogFactory.getLog(DominantResourceCalculator.class);
@Override
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
public int compare(Resource clusterResource, Resource lhs, Resource rhs,
boolean singleType) {
if (lhs.equals(rhs)) {
return 0;
}
if (isInvalidDivisor(clusterResource)) {
if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
.getVirtualCores())
|| (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
.getVirtualCores())) {
if ((lhs.getMemorySize() < rhs.getMemorySize() &&
lhs.getVirtualCores() > rhs.getVirtualCores()) ||
(lhs.getMemorySize() > rhs.getMemorySize() &&
lhs.getVirtualCores() < rhs.getVirtualCores())) {
return 0;
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
@ -79,7 +80,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
return -1;
} else if (l > r) {
return 1;
} else {
} else if (!singleType) {
l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false);
if (l < r) {

View File

@ -28,8 +28,36 @@ import org.apache.hadoop.yarn.api.records.Resource;
@Unstable
public abstract class ResourceCalculator {
public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs);
/**
* On a cluster with capacity {@code clusterResource}, compare {@code lhs}
* and {@code rhs}. Consider all resources unless {@code singleType} is set
* to true. When {@code singleType} is set to true, consider only one
* resource as per the {@link ResourceCalculator} implementation; the
* {@link DefaultResourceCalculator} considers memory and
* {@link DominantResourceCalculator} considers the dominant resource.
*
* @param clusterResource cluster capacity
* @param lhs First {@link Resource} to compare
* @param rhs Second {@link Resource} to compare
* @param singleType Whether to consider a single resource type or all
* resource types
* @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
*/
public abstract int compare(
Resource clusterResource, Resource lhs, Resource rhs, boolean singleType);
/**
* On a cluster with capacity {@code clusterResource}, compare {@code lhs}
* and {@code rhs} considering all resources.
*
* @param clusterResource cluster capacity
* @param lhs First {@link Resource} to compare
* @param rhs Second {@link Resource} to compare
* @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
*/
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
return compare(clusterResource, lhs, rhs, false);
}
public static int divideAndCeil(int a, int b) {
if (b == 0) {

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -604,8 +603,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource usageAfterPreemption = Resources.subtract(
getResourceUsage(), container.getAllocatedResource());
return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(),
scheduler.getClusterResource(), usageAfterPreemption, getFairShare());
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
}
/**
@ -855,8 +853,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
private boolean isReservable(Resource capacity) {
return scheduler.isAtLeastReservationThreshold(
getQueue().getPolicy().getResourceCalculator(), capacity);
// Reserve only when the app is starved and the requested container size
// is larger than the configured threshold
return isStarved() &&
scheduler.isAtLeastReservationThreshold(
getQueue().getPolicy().getResourceCalculator(), capacity);
}
private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) {
@ -1078,25 +1079,36 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @return freshly computed fairshare starvation
*/
Resource fairShareStarvation() {
long now = scheduler.getClock().getTime();
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
Resource starvation = Resources.componentwiseMin(threshold, demand);
Resources.subtractFromNonNegative(starvation, getResourceUsage());
Resource fairDemand = Resources.componentwiseMin(threshold, demand);
long now = scheduler.getClock().getTime();
boolean starved = !Resources.isNone(starvation);
// Check if the queue is starved for fairshare
boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
if (!starved) {
lastTimeAtFairShare = now;
}
if (starved &&
(now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
this.fairshareStarvation = starvation;
if (!starved ||
now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) {
fairshareStarvation = Resources.none();
} else {
this.fairshareStarvation = Resources.none();
// The app has been starved for longer than preemption-timeout.
fairshareStarvation =
Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
}
return this.fairshareStarvation;
return fairshareStarvation;
}
/**
* Helper method that checks if {@code usage} is strictly less than
* {@code share}.
*/
private boolean isUsageBelowShare(Resource usage, Resource share) {
return fsQueue.getPolicy().getResourceCalculator().compare(
scheduler.getClusterResource(), usage, share, true) < 0;
}
ResourceRequest getNextResourceRequest() {
@ -1107,9 +1119,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* Helper method that captures if this app is identified to be starved.
* @return true if the app is starved for fairshare, false otherwise
*/
@VisibleForTesting
boolean isStarvedForFairShare() {
return !Resources.isNone(fairshareStarvation);
return isUsageBelowShare(getResourceUsage(), getFairShare());
}
/**
* Is application starved for fairshare or minshare
*/
private boolean isStarved() {
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
}
/**
@ -1324,6 +1342,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return super.equals(o);
}
@Override
public String toString() {
return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption();
}
@Override
public boolean isPreemptable() {
return getQueue().isPreemptable();

View File

@ -155,8 +155,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
resourceOrder1, resourceOrder2);
}
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time.
res = (int)(s1.getStartTime() - s2.getStartTime());
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
}
return res;
}

View File

@ -131,8 +131,9 @@ public class FairSharePolicy extends SchedulingPolicy {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
}
return res;
}

View File

@ -116,6 +116,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.xml.sax.SAXException;
@ -2619,71 +2620,58 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
}
/**
* Reserve at a lower priority and verify the lower priority request gets
* allocated
*/
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
public void testReservationWithMultiplePriorities() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1");
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1",
"user1", 1, 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(1, app.getLiveContainers().size());
ContainerId containerId = scheduler.getSchedulerApp(attId)
.getLiveContainers().iterator().next().getContainerId();
// Cause reservation to be created
createSchedulingRequestExistingApplication(1024, 4, 2, attId);
// Create first app and take up half resources so the second app that asks
// for the entire node won't have enough.
FSAppAttempt app1 = scheduler.getSchedulerApp(
createSchedulingRequest(1024, 1, "queue", "user", 1));
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Basic allocation failed", 1, app1.getLiveContainers().size());
assertEquals(1, app.getLiveContainers().size());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
// Create request at higher priority
createSchedulingRequestExistingApplication(1024, 4, 1, attId);
// Create another app and reserve at a lower priority first
ApplicationAttemptId attId =
createSchedulingRequest(2048, 2, "queue1", "user1", 1, 2);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Reservation at lower priority failed",
1, app2.getReservedContainers().size());
assertEquals(1, app.getLiveContainers().size());
// Reserved container should still be at lower priority
for (RMContainer container : app.getReservedContainers()) {
assertEquals(2,
container.getReservedSchedulerKey().getPriority().getPriority());
}
// Request container on the second app at a higher priority
createSchedulingRequestExistingApplication(2048, 2, 1, attId);
// Complete container
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
// Complete the first container so we can trigger allocation for app2
ContainerId containerId =
app1.getLiveContainers().iterator().next().getContainerId();
scheduler.allocate(app1.getApplicationAttemptId(),
new ArrayList<ResourceRequest>(),
Arrays.asList(containerId), null, null, null, null);
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
// Schedule at opening
scheduler.update();
// Trigger allocation for app2
scheduler.handle(updateEvent);
// Reserved container (at lower priority) should be run
Collection<RMContainer> liveContainers = app.getLiveContainers();
assertEquals(1, liveContainers.size());
for (RMContainer liveContainer : liveContainers) {
Assert.assertEquals(2, liveContainer.getContainer().getPriority()
.getPriority());
}
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
Collection<RMContainer> liveContainers = app2.getLiveContainers();
assertEquals("Allocation post completion failed", 1, liveContainers.size());
assertEquals("High prio container allocated against low prio reservation",
2, liveContainers.iterator().next().getContainer().
getPriority().getPriority());
}
@Test
@ -3213,8 +3201,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
/**
* If we update our ask to strictly request a node, it doesn't make sense to keep
* a reservation on another.
* Strict locality requests shouldn't reserve resources on another node.
*/
@Test
public void testReservationsStrictLocality() throws IOException {
@ -3222,40 +3209,39 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
// Add two nodes
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
// Submit application without container requests
ApplicationAttemptId attId =
createSchedulingRequest(1024, "queue1", "user1", 0);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 2, false);
// Request a container on node2
ResourceRequest nodeRequest =
createResourceRequest(1024, node2.getHostName(), 1, 1, true);
ResourceRequest rackRequest =
createResourceRequest(1024, "rack1", 1, 1, false);
ResourceRequest anyRequest =
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId);
createSchedulingRequestExistingApplication(rackRequest, attId);
createSchedulingRequestExistingApplication(anyRequest, attId);
scheduler.update();
// Heartbeat from node1. App shouldn't get an allocation or reservation
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getLiveContainers().size());
assertEquals("App assigned a container on the wrong node",
0, app.getLiveContainers().size());
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getReservedContainers().size());
// now, make our request node-specific (on a different node)
rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
new ArrayList<ContainerId>(), null, null, null, null);
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
assertEquals("App reserved a container on the wrong node",
0, app.getReservedContainers().size());
}
@Test

View File

@ -72,7 +72,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
{"MinSharePreemptionWithDRF", 1},
{"FairSharePreemption", 2},
{"FairSharePreemptionWithDRF", 3}
});
});
}
public TestFairSchedulerPreemption(String name, int mode)
@ -110,6 +110,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
* |--- preemptable
* |--- child-1
* |--- child-2
* |--- preemptable-sibling
* |--- nonpreemptible
* |--- child-1
* |--- child-2
@ -133,6 +134,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
out.println("</queue>"); // end of preemptable queue
out.println("<queue name=\"preemptable-sibling\">");
writePreemptionParams(out);
out.println("</queue>");
// Queue with preemption disallowed
out.println("<queue name=\"nonpreemptable\">");
out.println("<allowPreemptionFrom>false" +
@ -269,10 +274,11 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
preemptHalfResources(queue2);
}
private void verifyPreemption() throws InterruptedException {
private void verifyPreemption(int numStarvedAppContainers)
throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
if (greedyApp.getLiveContainers().size() == 4) {
if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
break;
}
Thread.sleep(10);
@ -280,13 +286,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Verify the right amount of containers are preempted from greedyApp
assertEquals("Incorrect number of containers on the greedy app",
4, greedyApp.getLiveContainers().size());
2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp
assertEquals("Starved app is not assigned the right number of containers",
2, starvingApp.getLiveContainers().size());
numStarvedAppContainers, starvingApp.getLiveContainers().size());
}
private void verifyNoPreemption() throws InterruptedException {
@ -305,7 +311,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
verifyPreemption();
verifyPreemption(2);
} else {
verifyNoPreemption();
}
@ -314,13 +320,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
verifyPreemption();
verifyPreemption(2);
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
verifyPreemption();
verifyPreemption(2);
}
@Test
@ -354,7 +360,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
verifyPreemption();
verifyPreemption(2);
ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
@ -365,4 +371,24 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
assertTrue("Preempted containers should come from two different "
+ "nodes.", !host0.equals(host1));
}
@Test
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
throws InterruptedException {
// Run this test only for fairshare preemption
if (!fairsharePreemption) {
return;
}
// Let one of the child queues take over the entire cluster
takeAllResources("root.preemptable.child-1");
// Submit a job so half the resources go to parent's sibling
preemptHalfResources("root.preemptable-sibling");
verifyPreemption(2);
// Submit a job to the child's sibling to force preemption from the child
preemptHalfResources("root.preemptable.child-2");
verifyPreemption(1);
}
}