YARN-3920. FairScheduler container reservation on a node should be configurable to limit it to large containers (adhoot via asuresh)
This commit is contained in:
parent
602335dfe6
commit
94dec5a916
|
@ -454,6 +454,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup
|
MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup
|
||||||
failure during commitJob. (Junping Du via wangda)
|
failure during commitJob. (Junping Du via wangda)
|
||||||
|
|
||||||
|
YARN-3920. FairScheduler container reservation on a node should be
|
||||||
|
configurable to limit it to large containers (adhoot via asuresh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -543,10 +543,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
return container.getResource();
|
return container.getResource();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isReservable(container)) {
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
reserve(request.getPriority(), node, container, reserved);
|
reserve(request.getPriority(), node, container, reserved);
|
||||||
|
|
||||||
return FairScheduler.CONTAINER_RESERVED;
|
return FairScheduler.CONTAINER_RESERVED;
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Not creating reservation as container " + container.getId()
|
||||||
|
+ " is not reservable");
|
||||||
|
}
|
||||||
|
return Resources.none();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isReservable(Container container) {
|
||||||
|
return scheduler.isAtLeastReservationThreshold(
|
||||||
|
getQueue().getPolicy().getResourceCalculator(), container.getResource());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasNodeOrRackLocalRequests(Priority priority) {
|
private boolean hasNodeOrRackLocalRequests(Priority priority) {
|
||||||
|
|
|
@ -195,6 +195,10 @@ public class FairScheduler extends
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
AllocationConfiguration allocConf;
|
AllocationConfiguration allocConf;
|
||||||
|
|
||||||
|
// Container size threshold for making a reservation.
|
||||||
|
@VisibleForTesting
|
||||||
|
Resource reservationThreshold;
|
||||||
|
|
||||||
public FairScheduler() {
|
public FairScheduler() {
|
||||||
super(FairScheduler.class.getName());
|
super(FairScheduler.class.getName());
|
||||||
clock = new SystemClock();
|
clock = new SystemClock();
|
||||||
|
@ -203,6 +207,12 @@ public class FairScheduler extends
|
||||||
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAtLeastReservationThreshold(
|
||||||
|
ResourceCalculator resourceCalculator, Resource resource) {
|
||||||
|
return Resources.greaterThanOrEqual(
|
||||||
|
resourceCalculator, clusterResource, resource, reservationThreshold);
|
||||||
|
}
|
||||||
|
|
||||||
private void validateConf(Configuration conf) {
|
private void validateConf(Configuration conf) {
|
||||||
// validate scheduler memory allocation setting
|
// validate scheduler memory allocation setting
|
||||||
int minMem = conf.getInt(
|
int minMem = conf.getInt(
|
||||||
|
@ -1325,6 +1335,7 @@ public class FairScheduler extends
|
||||||
minimumAllocation = this.conf.getMinimumAllocation();
|
minimumAllocation = this.conf.getMinimumAllocation();
|
||||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
||||||
incrAllocation = this.conf.getIncrementAllocation();
|
incrAllocation = this.conf.getIncrementAllocation();
|
||||||
|
updateReservationThreshold();
|
||||||
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
||||||
continuousSchedulingSleepMs =
|
continuousSchedulingSleepMs =
|
||||||
this.conf.getContinuousSchedulingSleepMs();
|
this.conf.getContinuousSchedulingSleepMs();
|
||||||
|
@ -1391,6 +1402,14 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateReservationThreshold() {
|
||||||
|
Resource newThreshold = Resources.multiply(
|
||||||
|
getIncrementResourceCapability(),
|
||||||
|
this.conf.getReservationThresholdIncrementMultiple());
|
||||||
|
|
||||||
|
reservationThreshold = newThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void startSchedulerThreads() {
|
private synchronized void startSchedulerThreads() {
|
||||||
Preconditions.checkNotNull(updateThread, "updateThread is null");
|
Preconditions.checkNotNull(updateThread, "updateThread is null");
|
||||||
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
|
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -50,6 +48,16 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
|
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
|
||||||
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
|
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
|
||||||
|
|
||||||
|
/** Threshold for container size for making a container reservation as a
|
||||||
|
* multiple of increment allocation. Only container sizes above this are
|
||||||
|
* allowed to reserve a node */
|
||||||
|
public static final String
|
||||||
|
RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE =
|
||||||
|
YarnConfiguration.YARN_PREFIX +
|
||||||
|
"scheduler.reservation-threshold.increment-multiple";
|
||||||
|
public static final float
|
||||||
|
DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f;
|
||||||
|
|
||||||
private static final String CONF_PREFIX = "yarn.scheduler.fair.";
|
private static final String CONF_PREFIX = "yarn.scheduler.fair.";
|
||||||
|
|
||||||
public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
|
public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
|
||||||
|
@ -167,6 +175,12 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
return Resources.createResource(incrementMemory, incrementCores);
|
return Resources.createResource(incrementMemory, incrementCores);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getReservationThresholdIncrementMultiple() {
|
||||||
|
return getFloat(
|
||||||
|
RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
|
||||||
|
DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE);
|
||||||
|
}
|
||||||
|
|
||||||
public float getLocalityThresholdNode() {
|
public float getLocalityThresholdNode() {
|
||||||
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
|
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ public class FairSchedulerTestBase {
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
protected FairScheduler scheduler;
|
protected FairScheduler scheduler;
|
||||||
protected ResourceManager resourceManager;
|
protected ResourceManager resourceManager;
|
||||||
|
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
|
||||||
|
|
||||||
// Helper methods
|
// Helper methods
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
|
@ -76,6 +77,11 @@ public class FairSchedulerTestBase {
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
|
||||||
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
|
||||||
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
|
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
|
||||||
|
|
||||||
|
conf.setFloat(
|
||||||
|
FairSchedulerConfiguration
|
||||||
|
.RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
|
||||||
|
TEST_RESERVATION_THRESHOLD);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -710,7 +710,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
// Asked for less than increment allocation.
|
// Asked for less than increment allocation.
|
||||||
assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
assertEquals(
|
||||||
|
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
||||||
scheduler.getQueueManager().getQueue("queue1").
|
scheduler.getQueueManager().getQueue("queue1").
|
||||||
getResourceUsage().getMemory());
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
@ -939,8 +940,88 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
getResourceUsage().getMemory());
|
getResourceUsage().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReservationThresholdGatesReservations() throws Exception {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf" +
|
||||||
|
"</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Set threshold to 2 * 1024 ==> 2048 MB & 2 * 1 ==> 2 vcores (test will
|
||||||
|
// use vcores)
|
||||||
|
conf.setFloat(FairSchedulerConfiguration.
|
||||||
|
RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
|
||||||
|
2f);
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Add a node
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes
|
||||||
|
.newNodeInfo(1, Resources.createResource(4096, 4), 1, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
// Queue 1 requests full capacity of node
|
||||||
|
createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1);
|
||||||
|
scheduler.update();
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Make sure queue 1 is allocated app capacity
|
||||||
|
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Now queue 2 requests below threshold
|
||||||
|
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Make sure queue 2 has no reservation
|
||||||
|
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
assertEquals(0,
|
||||||
|
scheduler.getSchedulerApp(attId).getReservedContainers().size());
|
||||||
|
|
||||||
|
// Now queue requests CPU above threshold
|
||||||
|
createSchedulingRequestExistingApplication(1024, 3, 1, attId);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Make sure queue 2 is waiting with a reservation
|
||||||
|
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
|
||||||
|
.getVirtualCores());
|
||||||
|
|
||||||
|
// Now another node checks in with capacity
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes
|
||||||
|
.newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
scheduler.handle(updateEvent2);
|
||||||
|
|
||||||
|
// Make sure this goes to queue 2
|
||||||
|
assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
|
||||||
|
getResourceUsage().getVirtualCores());
|
||||||
|
|
||||||
|
// The old reservation should still be there...
|
||||||
|
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
|
||||||
|
.getVirtualCores());
|
||||||
|
// ... but it should disappear when we update the first node.
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation()
|
||||||
|
.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyQueueName() throws Exception {
|
public void testEmptyQueueName() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue