YARN-5035. FairScheduler: Adjust maxAssign dynamically when assignMultiple is turned on. (kasha)

(cherry picked from commit 04ded558b0)
(cherry picked from commit 59335b4d7a)
This commit is contained in:
Karthik Kambatla 2016-05-26 14:41:07 -07:00
parent 2e755a7f0e
commit 427e3f995e
4 changed files with 98 additions and 9 deletions

View File

@ -202,6 +202,8 @@ public class FairScheduler extends
private FairSchedulerEventLog eventLog; // Machine-readable event log private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat // heartbeat
@VisibleForTesting
boolean maxAssignDynamic;
protected int maxAssign; // Max containers to assign per heartbeat protected int maxAssign; // Max containers to assign per heartbeat
@VisibleForTesting @VisibleForTesting
@ -1141,6 +1143,22 @@ public int compare(NodeId n1, NodeId n2) {
} }
} }
private boolean shouldContinueAssigning(int containers,
Resource maxResourcesToAssign, Resource assignedResource) {
if (!assignMultiple) {
return false; // assignMultiple is not enabled. Allocate one at a time.
}
if (maxAssignDynamic) {
// Using fitsIn to check if the resources assigned so far are less than
// or equal to max resources to assign (half of remaining resources).
// The "equal to" part can lead to allocating one extra container.
return Resources.fitsIn(assignedResource, maxResourcesToAssign);
} else {
return maxAssign <= 0 || containers < maxAssign;
}
}
@VisibleForTesting @VisibleForTesting
synchronized void attemptScheduling(FSSchedulerNode node) { synchronized void attemptScheduling(FSSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled() if (rmContext.isWorkPreservingRecoveryEnabled()
@ -1169,16 +1187,22 @@ synchronized void attemptScheduling(FSSchedulerNode node) {
if (!validReservation) { if (!validReservation) {
// No reservation, schedule at queue which is farthest below fair share // No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0; int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign =
Resources.multiply(node.getAvailableResource(), 0.5f);
while (node.getReservedContainer() == null) { while (node.getReservedContainer() == null) {
boolean assignedContainer = false; boolean assignedContainer = false;
if (!queueMgr.getRootQueue().assignContainer(node).equals( Resource assignment = queueMgr.getRootQueue().assignContainer(node);
Resources.none())) { if (!assignment.equals(Resources.none())) {
assignedContainers++; assignedContainers++;
assignedContainer = true; assignedContainer = true;
Resources.addTo(assignedResource, assignment);
} }
if (!assignedContainer) { break; } if (!assignedContainer) { break; }
if (!assignMultiple) { break; } if (!shouldContinueAssigning(assignedContainers,
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } maxResourcesToAssign, assignedResource)) {
break;
}
} }
} }
updateRootQueueMetrics(); updateRootQueueMetrics();
@ -1404,6 +1428,7 @@ private void initScheduler(Configuration conf) throws IOException {
preemptionUtilizationThreshold = preemptionUtilizationThreshold =
this.conf.getPreemptionUtilizationThreshold(); this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple(); assignMultiple = this.conf.getAssignMultiple();
maxAssignDynamic = this.conf.isMaxAssignDynamic();
maxAssign = this.conf.getMaxAssign(); maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight(); sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval(); preemptionInterval = this.conf.getPreemptionInterval();

View File

@ -129,6 +129,14 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false; protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
/** Maximum number of containers to assign on each check-in. */ /** Maximum number of containers to assign on each check-in. */
public static final String DYNAMIC_MAX_ASSIGN =
CONF_PREFIX + "dynamic.max.assign";
private static final boolean DEFAULT_DYNAMIC_MAX_ASSIGN = true;
/**
* Specify exact number of containers to assign on each heartbeat, if dynamic
* max assign is turned off.
*/
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
protected static final int DEFAULT_MAX_ASSIGN = -1; protected static final int DEFAULT_MAX_ASSIGN = -1;
@ -222,6 +230,10 @@ public boolean getAssignMultiple() {
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
} }
public boolean isMaxAssignDynamic() {
return getBoolean(DYNAMIC_MAX_ASSIGN, DEFAULT_DYNAMIC_MAX_ASSIGN);
}
public int getMaxAssign() { public int getMaxAssign() {
return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN); return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
} }

View File

@ -998,6 +998,7 @@ public void testReservationThresholdWithAssignMultiple() throws Exception {
// set reservable-nodes to 0 which make reservation exceed // set reservable-nodes to 0 which make reservation exceed
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0f); conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0f);
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
conf.setBoolean(FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, false);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -3193,8 +3194,9 @@ public void testFifoWithinQueue() throws Exception {
} }
@Test(timeout = 3000) @Test(timeout = 3000)
public void testMaxAssign() throws Exception { public void testFixedMaxAssign() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
conf.setBoolean(FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, false);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -3225,9 +3227,58 @@ public void testMaxAssign() throws Exception {
.getLiveContainers().size()); .getLiveContainers().size());
} }
/**
* Test to verify the behavior of dynamic-max-assign.
* 1. Verify the value of maxassign doesn't affect number of containers
* affected.
* 2. Verify the node is fully allocated.
*/
@Test(timeout = 3000)
public void testDynamicMaxAssign() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(8192, 8), 0,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
ApplicationAttemptId attId =
createSchedulingRequest(1024, 1, "root.default", "user", 12);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
// Set maxassign to a value smaller than half the remaining resources
scheduler.maxAssign = 2;
scheduler.update();
scheduler.handle(updateEvent);
// New container allocations should be floor(8/2) + 1 = 5
assertEquals("Incorrect number of containers allocated", 5,
app.getLiveContainers().size());
// Set maxassign to a value larger than half the remaining resources
scheduler.maxAssign = 4;
scheduler.update();
scheduler.handle(updateEvent);
// New container allocations should be floor(3/2) + 1 = 2
assertEquals("Incorrect number of containers allocated", 7,
app.getLiveContainers().size());
scheduler.update();
scheduler.handle(updateEvent);
// New container allocations should be 1
assertEquals("Incorrect number of containers allocated", 8,
app.getLiveContainers().size());
}
@Test(timeout = 3000) @Test(timeout = 3000)
public void testMaxAssignWithZeroMemoryContainers() throws Exception { public void testMaxAssignWithZeroMemoryContainers() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
conf.setBoolean(FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, false);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
scheduler.init(conf); scheduler.init(conf);

View File

@ -80,7 +80,8 @@ Customizing the Fair Scheduler typically involves altering two files. First, sch
| `yarn.scheduler.fair.preemption.cluster-utilization-threshold` | The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f. | | `yarn.scheduler.fair.preemption.cluster-utilization-threshold` | The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f. |
| `yarn.scheduler.fair.sizebasedweight` | Whether to assign shares to individual apps based on their size, rather than providing an equal share to all apps regardless of size. When set to true, apps are weighted by the natural logarithm of one plus the app's total requested memory, divided by the natural logarithm of 2. Defaults to false. | | `yarn.scheduler.fair.sizebasedweight` | Whether to assign shares to individual apps based on their size, rather than providing an equal share to all apps regardless of size. When set to true, apps are weighted by the natural logarithm of one plus the app's total requested memory, divided by the natural logarithm of 2. Defaults to false. |
| `yarn.scheduler.fair.assignmultiple` | Whether to allow multiple container assignments in one heartbeat. Defaults to false. | | `yarn.scheduler.fair.assignmultiple` | Whether to allow multiple container assignments in one heartbeat. Defaults to false. |
| `yarn.scheduler.fair.max.assign` | If assignmultiple is true, the maximum amount of containers that can be assigned in one heartbeat. Defaults to -1, which sets no limit. | | `yarn.scheduler.fair.dynamic.max.assign` | If assignmultiple is true, whether to dynamically determine the amount of resources that can be assigned in one heartbeat. When turned on, about half of the un-allocated resources on the node are allocated to containers in a single heartbeat. Defaults to true. |
| `yarn.scheduler.fair.max.assign` | If assignmultiple is true and dynamic.max.assign is false, the maximum amount of containers that can be assigned in one heartbeat. Defaults to -1, which sets no limit. |
| `yarn.scheduler.fair.locality.threshold.node` | For applications that request containers on particular nodes, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another node. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities. | | `yarn.scheduler.fair.locality.threshold.node` | For applications that request containers on particular nodes, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another node. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities. |
| `yarn.scheduler.fair.locality.threshold.rack` | For applications that request containers on particular racks, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another rack. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities. | | `yarn.scheduler.fair.locality.threshold.rack` | For applications that request containers on particular racks, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another rack. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities. |
| `yarn.scheduler.fair.allow-undeclared-pools` | If this is true, new queues can be created at application submission time, whether because they are specified as the application's queue by the submitter or because they are placed there by the user-as-default-queue property. If this is false, any time an app would be placed in a queue that is not specified in the allocations file, it is placed in the "default" queue instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. | | `yarn.scheduler.fair.allow-undeclared-pools` | If this is true, new queues can be created at application submission time, whether because they are specified as the application's queue by the submitter or because they are placed there by the user-as-default-queue property. If this is false, any time an app would be placed in a queue that is not specified in the allocations file, it is placed in the "default" queue instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. |