YARN-6210. FairScheduler: Node reservations can interfere with preemption. (kasha)
This commit is contained in:
parent
732ee6f0b5
commit
718ad9f6ee
|
@ -30,7 +30,8 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
||||||
LogFactory.getLog(DefaultResourceCalculator.class);
|
LogFactory.getLog(DefaultResourceCalculator.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compare(Resource unused, Resource lhs, Resource rhs) {
|
public int compare(Resource unused, Resource lhs, Resource rhs,
|
||||||
|
boolean singleType) {
|
||||||
// Only consider memory
|
// Only consider memory
|
||||||
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
|
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,17 +51,18 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
LogFactory.getLog(DominantResourceCalculator.class);
|
LogFactory.getLog(DominantResourceCalculator.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
|
public int compare(Resource clusterResource, Resource lhs, Resource rhs,
|
||||||
|
boolean singleType) {
|
||||||
|
|
||||||
if (lhs.equals(rhs)) {
|
if (lhs.equals(rhs)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isInvalidDivisor(clusterResource)) {
|
if (isInvalidDivisor(clusterResource)) {
|
||||||
if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
|
if ((lhs.getMemorySize() < rhs.getMemorySize() &&
|
||||||
.getVirtualCores())
|
lhs.getVirtualCores() > rhs.getVirtualCores()) ||
|
||||||
|| (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
|
(lhs.getMemorySize() > rhs.getMemorySize() &&
|
||||||
.getVirtualCores())) {
|
lhs.getVirtualCores() < rhs.getVirtualCores())) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|
||||||
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
|
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
|
||||||
|
@ -79,7 +80,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (l > r) {
|
} else if (l > r) {
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else if (!singleType) {
|
||||||
l = getResourceAsValue(clusterResource, lhs, false);
|
l = getResourceAsValue(clusterResource, lhs, false);
|
||||||
r = getResourceAsValue(clusterResource, rhs, false);
|
r = getResourceAsValue(clusterResource, rhs, false);
|
||||||
if (l < r) {
|
if (l < r) {
|
||||||
|
|
|
@ -28,8 +28,36 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class ResourceCalculator {
|
public abstract class ResourceCalculator {
|
||||||
|
|
||||||
public abstract int
|
/**
|
||||||
compare(Resource clusterResource, Resource lhs, Resource rhs);
|
* On a cluster with capacity {@code clusterResource}, compare {@code lhs}
|
||||||
|
* and {@code rhs}. Consider all resources unless {@code singleType} is set
|
||||||
|
* to true. When {@code singleType} is set to true, consider only one
|
||||||
|
* resource as per the {@link ResourceCalculator} implementation; the
|
||||||
|
* {@link DefaultResourceCalculator} considers memory and
|
||||||
|
* {@link DominantResourceCalculator} considers the dominant resource.
|
||||||
|
*
|
||||||
|
* @param clusterResource cluster capacity
|
||||||
|
* @param lhs First {@link Resource} to compare
|
||||||
|
* @param rhs Second {@link Resource} to compare
|
||||||
|
* @param singleType Whether to consider a single resource type or all
|
||||||
|
* resource types
|
||||||
|
* @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
|
||||||
|
*/
|
||||||
|
public abstract int compare(
|
||||||
|
Resource clusterResource, Resource lhs, Resource rhs, boolean singleType);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* On a cluster with capacity {@code clusterResource}, compare {@code lhs}
|
||||||
|
* and {@code rhs} considering all resources.
|
||||||
|
*
|
||||||
|
* @param clusterResource cluster capacity
|
||||||
|
* @param lhs First {@link Resource} to compare
|
||||||
|
* @param rhs Second {@link Resource} to compare
|
||||||
|
* @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger
|
||||||
|
*/
|
||||||
|
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
|
||||||
|
return compare(clusterResource, lhs, rhs, false);
|
||||||
|
}
|
||||||
|
|
||||||
public static int divideAndCeil(int a, int b) {
|
public static int divideAndCeil(int a, int b) {
|
||||||
if (b == 0) {
|
if (b == 0) {
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -605,8 +604,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
Resource usageAfterPreemption = Resources.subtract(
|
Resource usageAfterPreemption = Resources.subtract(
|
||||||
getResourceUsage(), container.getAllocatedResource());
|
getResourceUsage(), container.getAllocatedResource());
|
||||||
|
|
||||||
return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(),
|
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
|
||||||
scheduler.getClusterResource(), usageAfterPreemption, getFairShare());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -833,9 +831,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
}
|
}
|
||||||
|
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
if (isReservable(capability) && reserve(
|
if (isReservable(capability) &&
|
||||||
pendingAsk.getPerAllocationResource(), node, reservedContainer, type,
|
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
|
||||||
schedulerKey)) {
|
type, schedulerKey)) {
|
||||||
if (isWaitingForAMContainer()) {
|
if (isWaitingForAMContainer()) {
|
||||||
updateAMDiagnosticMsg(capability,
|
updateAMDiagnosticMsg(capability,
|
||||||
" exceed the available resources of the node and the request is"
|
" exceed the available resources of the node and the request is"
|
||||||
|
@ -857,7 +855,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isReservable(Resource capacity) {
|
private boolean isReservable(Resource capacity) {
|
||||||
return scheduler.isAtLeastReservationThreshold(
|
// Reserve only when the app is starved and the requested container size
|
||||||
|
// is larger than the configured threshold
|
||||||
|
return isStarved() &&
|
||||||
|
scheduler.isAtLeastReservationThreshold(
|
||||||
getQueue().getPolicy().getResourceCalculator(), capacity);
|
getQueue().getPolicy().getResourceCalculator(), capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1089,34 +1090,51 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
* @return freshly computed fairshare starvation
|
* @return freshly computed fairshare starvation
|
||||||
*/
|
*/
|
||||||
Resource fairShareStarvation() {
|
Resource fairShareStarvation() {
|
||||||
|
long now = scheduler.getClock().getTime();
|
||||||
Resource threshold = Resources.multiply(
|
Resource threshold = Resources.multiply(
|
||||||
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
|
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
|
||||||
Resource starvation = Resources.componentwiseMin(threshold, demand);
|
Resource fairDemand = Resources.componentwiseMin(threshold, demand);
|
||||||
Resources.subtractFromNonNegative(starvation, getResourceUsage());
|
|
||||||
|
|
||||||
long now = scheduler.getClock().getTime();
|
// Check if the queue is starved for fairshare
|
||||||
boolean starved = !Resources.isNone(starvation);
|
boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
|
||||||
|
|
||||||
if (!starved) {
|
if (!starved) {
|
||||||
lastTimeAtFairShare = now;
|
lastTimeAtFairShare = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (starved &&
|
if (!starved ||
|
||||||
(now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
|
now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) {
|
||||||
this.fairshareStarvation = starvation;
|
fairshareStarvation = Resources.none();
|
||||||
} else {
|
} else {
|
||||||
this.fairshareStarvation = Resources.none();
|
// The app has been starved for longer than preemption-timeout.
|
||||||
|
fairshareStarvation =
|
||||||
|
Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
|
||||||
}
|
}
|
||||||
return this.fairshareStarvation;
|
return fairshareStarvation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method that checks if {@code usage} is strictly less than
|
||||||
|
* {@code share}.
|
||||||
|
*/
|
||||||
|
private boolean isUsageBelowShare(Resource usage, Resource share) {
|
||||||
|
return fsQueue.getPolicy().getResourceCalculator().compare(
|
||||||
|
scheduler.getClusterResource(), usage, share, true) < 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method that captures if this app is identified to be starved.
|
* Helper method that captures if this app is identified to be starved.
|
||||||
* @return true if the app is starved for fairshare, false otherwise
|
* @return true if the app is starved for fairshare, false otherwise
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
boolean isStarvedForFairShare() {
|
boolean isStarvedForFairShare() {
|
||||||
return !Resources.isNone(fairshareStarvation);
|
return isUsageBelowShare(getResourceUsage(), getFairShare());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is application starved for fairshare or minshare
|
||||||
|
*/
|
||||||
|
private boolean isStarved() {
|
||||||
|
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1332,6 +1350,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
return super.equals(o);
|
return super.equals(o);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isPreemptable() {
|
public boolean isPreemptable() {
|
||||||
return getQueue().isPreemptable();
|
return getQueue().isPreemptable();
|
||||||
|
|
|
@ -155,8 +155,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
||||||
resourceOrder1, resourceOrder2);
|
resourceOrder1, resourceOrder2);
|
||||||
}
|
}
|
||||||
if (res == 0) {
|
if (res == 0) {
|
||||||
// Apps are tied in fairness ratio. Break the tie by submit time.
|
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
||||||
res = (int)(s1.getStartTime() - s2.getStartTime());
|
// name to get a deterministic ordering, which is useful for unit tests.
|
||||||
|
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
||||||
|
if (res == 0) {
|
||||||
|
res = s1.getName().compareTo(s2.getName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,9 +131,10 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
||||||
// name to get a deterministic ordering, which is useful for unit tests.
|
// name to get a deterministic ordering, which is useful for unit tests.
|
||||||
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
||||||
if (res == 0)
|
if (res == 0) {
|
||||||
res = s1.getName().compareTo(s2.getName());
|
res = s1.getName().compareTo(s2.getName());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,6 +119,7 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
|
@ -2627,71 +2628,57 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reserve at a lower priority and verify the lower priority request gets
|
||||||
|
* allocated
|
||||||
|
*/
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testReservationWhileMultiplePriorities() throws IOException {
|
public void testReservationWithMultiplePriorities() throws IOException {
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
// Add a node
|
// Add a node
|
||||||
RMNode node1 =
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
|
||||||
MockNodes
|
|
||||||
.newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
scheduler.handle(nodeEvent1);
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1",
|
|
||||||
"user1", 1, 2);
|
|
||||||
scheduler.update();
|
|
||||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||||
scheduler.handle(updateEvent);
|
|
||||||
|
|
||||||
FSAppAttempt app = scheduler.getSchedulerApp(attId);
|
// Create first app and take up half resources so the second app that asks
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
// for the entire node won't have enough.
|
||||||
|
FSAppAttempt app1 = scheduler.getSchedulerApp(
|
||||||
ContainerId containerId = scheduler.getSchedulerApp(attId)
|
createSchedulingRequest(1024, 1, "queue", "user", 1));
|
||||||
.getLiveContainers().iterator().next().getContainerId();
|
|
||||||
|
|
||||||
// Cause reservation to be created
|
|
||||||
createSchedulingRequestExistingApplication(1024, 4, 2, attId);
|
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Basic allocation failed", 1, app1.getLiveContainers().size());
|
||||||
|
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
// Create another app and reserve at a lower priority first
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
ApplicationAttemptId attId =
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
createSchedulingRequest(2048, 2, "queue1", "user1", 1, 2);
|
||||||
|
FSAppAttempt app2 = scheduler.getSchedulerApp(attId);
|
||||||
// Create request at higher priority
|
|
||||||
createSchedulingRequestExistingApplication(1024, 4, 1, attId);
|
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Reservation at lower priority failed",
|
||||||
|
1, app2.getReservedContainers().size());
|
||||||
|
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
// Request container on the second app at a higher priority
|
||||||
// Reserved container should still be at lower priority
|
createSchedulingRequestExistingApplication(2048, 2, 1, attId);
|
||||||
for (RMContainer container : app.getReservedContainers()) {
|
|
||||||
assertEquals(2,
|
|
||||||
container.getReservedSchedulerKey().getPriority().getPriority());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Complete container
|
// Complete the first container so we can trigger allocation for app2
|
||||||
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
|
ContainerId containerId =
|
||||||
|
app1.getLiveContainers().iterator().next().getContainerId();
|
||||||
|
scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(),
|
||||||
Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
|
Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
|
||||||
assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
|
|
||||||
assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
|
||||||
|
|
||||||
// Schedule at opening
|
// Trigger allocation for app2
|
||||||
scheduler.update();
|
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
// Reserved container (at lower priority) should be run
|
// Reserved container (at lower priority) should be run
|
||||||
Collection<RMContainer> liveContainers = app.getLiveContainers();
|
Collection<RMContainer> liveContainers = app2.getLiveContainers();
|
||||||
assertEquals(1, liveContainers.size());
|
assertEquals("Allocation post completion failed", 1, liveContainers.size());
|
||||||
for (RMContainer liveContainer : liveContainers) {
|
assertEquals("High prio container allocated against low prio reservation",
|
||||||
Assert.assertEquals(2, liveContainer.getContainer().getPriority()
|
2, liveContainers.iterator().next().getContainer().
|
||||||
.getPriority());
|
getPriority().getPriority());
|
||||||
}
|
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -3222,8 +3209,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If we update our ask to strictly request a node, it doesn't make sense to keep
|
* Strict locality requests shouldn't reserve resources on another node.
|
||||||
* a reservation on another.
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testReservationsStrictLocality() throws IOException {
|
public void testReservationsStrictLocality() throws IOException {
|
||||||
|
@ -3231,40 +3217,39 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
// Add two nodes
|
||||||
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
scheduler.handle(nodeEvent1);
|
scheduler.handle(nodeEvent1);
|
||||||
|
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1));
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
|
// Submit application without container requests
|
||||||
"user1", 0);
|
ApplicationAttemptId attId =
|
||||||
|
createSchedulingRequest(1024, "queue1", "user1", 0);
|
||||||
FSAppAttempt app = scheduler.getSchedulerApp(attId);
|
FSAppAttempt app = scheduler.getSchedulerApp(attId);
|
||||||
|
|
||||||
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
|
// Request a container on node2
|
||||||
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
|
ResourceRequest nodeRequest =
|
||||||
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
|
createResourceRequest(1024, node2.getHostName(), 1, 1, true);
|
||||||
1, 2, false);
|
ResourceRequest rackRequest =
|
||||||
|
createResourceRequest(1024, "rack1", 1, 1, false);
|
||||||
|
ResourceRequest anyRequest =
|
||||||
|
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false);
|
||||||
createSchedulingRequestExistingApplication(nodeRequest, attId);
|
createSchedulingRequestExistingApplication(nodeRequest, attId);
|
||||||
createSchedulingRequestExistingApplication(rackRequest, attId);
|
createSchedulingRequestExistingApplication(rackRequest, attId);
|
||||||
createSchedulingRequestExistingApplication(anyRequest, attId);
|
createSchedulingRequestExistingApplication(anyRequest, attId);
|
||||||
|
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
|
|
||||||
|
// Heartbeat from node1. App shouldn't get an allocation or reservation
|
||||||
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||||
scheduler.handle(nodeUpdateEvent);
|
scheduler.handle(nodeUpdateEvent);
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
assertEquals("App assigned a container on the wrong node",
|
||||||
|
0, app.getLiveContainers().size());
|
||||||
scheduler.handle(nodeUpdateEvent);
|
scheduler.handle(nodeUpdateEvent);
|
||||||
assertEquals(1, app.getReservedContainers().size());
|
assertEquals("App reserved a container on the wrong node",
|
||||||
|
0, app.getReservedContainers().size());
|
||||||
// now, make our request node-specific (on a different node)
|
|
||||||
rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
|
|
||||||
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
|
|
||||||
1, 1, false);
|
|
||||||
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
|
|
||||||
new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
|
|
||||||
|
|
||||||
scheduler.handle(nodeUpdateEvent);
|
|
||||||
assertEquals(0, app.getReservedContainers().size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -110,6 +110,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
* |--- preemptable
|
* |--- preemptable
|
||||||
* |--- child-1
|
* |--- child-1
|
||||||
* |--- child-2
|
* |--- child-2
|
||||||
|
* |--- preemptable-sibling
|
||||||
* |--- nonpreemptible
|
* |--- nonpreemptible
|
||||||
* |--- child-1
|
* |--- child-1
|
||||||
* |--- child-2
|
* |--- child-2
|
||||||
|
@ -133,6 +134,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
|
|
||||||
out.println("</queue>"); // end of preemptable queue
|
out.println("</queue>"); // end of preemptable queue
|
||||||
|
|
||||||
|
out.println("<queue name=\"preemptable-sibling\">");
|
||||||
|
writePreemptionParams(out);
|
||||||
|
out.println("</queue>");
|
||||||
|
|
||||||
// Queue with preemption disallowed
|
// Queue with preemption disallowed
|
||||||
out.println("<queue name=\"nonpreemptable\">");
|
out.println("<queue name=\"nonpreemptable\">");
|
||||||
out.println("<allowPreemptionFrom>false" +
|
out.println("<allowPreemptionFrom>false" +
|
||||||
|
@ -269,10 +274,11 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
preemptHalfResources(queue2);
|
preemptHalfResources(queue2);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyPreemption() throws InterruptedException {
|
private void verifyPreemption(int numStarvedAppContainers)
|
||||||
|
throws InterruptedException {
|
||||||
// Sleep long enough for four containers to be preempted.
|
// Sleep long enough for four containers to be preempted.
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
if (greedyApp.getLiveContainers().size() == 4) {
|
if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
|
@ -280,13 +286,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
|
|
||||||
// Verify the right amount of containers are preempted from greedyApp
|
// Verify the right amount of containers are preempted from greedyApp
|
||||||
assertEquals("Incorrect number of containers on the greedy app",
|
assertEquals("Incorrect number of containers on the greedy app",
|
||||||
4, greedyApp.getLiveContainers().size());
|
2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
|
||||||
|
|
||||||
sendEnoughNodeUpdatesToAssignFully();
|
sendEnoughNodeUpdatesToAssignFully();
|
||||||
|
|
||||||
// Verify the preempted containers are assigned to starvingApp
|
// Verify the preempted containers are assigned to starvingApp
|
||||||
assertEquals("Starved app is not assigned the right number of containers",
|
assertEquals("Starved app is not assigned the right number of containers",
|
||||||
2, starvingApp.getLiveContainers().size());
|
numStarvedAppContainers, starvingApp.getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyNoPreemption() throws InterruptedException {
|
private void verifyNoPreemption() throws InterruptedException {
|
||||||
|
@ -305,7 +311,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
String queue = "root.preemptable.child-1";
|
String queue = "root.preemptable.child-1";
|
||||||
submitApps(queue, queue);
|
submitApps(queue, queue);
|
||||||
if (fairsharePreemption) {
|
if (fairsharePreemption) {
|
||||||
verifyPreemption();
|
verifyPreemption(2);
|
||||||
} else {
|
} else {
|
||||||
verifyNoPreemption();
|
verifyNoPreemption();
|
||||||
}
|
}
|
||||||
|
@ -314,13 +320,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
|
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
|
||||||
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
|
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
|
||||||
verifyPreemption();
|
verifyPreemption(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
|
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
|
||||||
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
|
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
|
||||||
verifyPreemption();
|
verifyPreemption(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -354,7 +360,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
setNumAMContainersPerNode(2);
|
setNumAMContainersPerNode(2);
|
||||||
preemptHalfResources("root.preemptable.child-2");
|
preemptHalfResources("root.preemptable.child-2");
|
||||||
|
|
||||||
verifyPreemption();
|
verifyPreemption(2);
|
||||||
|
|
||||||
ArrayList<RMContainer> containers =
|
ArrayList<RMContainer> containers =
|
||||||
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
|
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
|
||||||
|
@ -365,4 +371,24 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
assertTrue("Preempted containers should come from two different "
|
assertTrue("Preempted containers should come from two different "
|
||||||
+ "nodes.", !host0.equals(host1));
|
+ "nodes.", !host0.equals(host1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
|
||||||
|
throws InterruptedException {
|
||||||
|
// Run this test only for fairshare preemption
|
||||||
|
if (!fairsharePreemption) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let one of the child queues take over the entire cluster
|
||||||
|
takeAllResources("root.preemptable.child-1");
|
||||||
|
|
||||||
|
// Submit a job so half the resources go to parent's sibling
|
||||||
|
preemptHalfResources("root.preemptable-sibling");
|
||||||
|
verifyPreemption(2);
|
||||||
|
|
||||||
|
// Submit a job to the child's sibling to force preemption from the child
|
||||||
|
preemptHalfResources("root.preemptable.child-2");
|
||||||
|
verifyPreemption(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue