YARN-4270. Limit application resource reservation on nodes for non-node/rack specific requests (asuresh)

(cherry picked from commit 7e2837f830382835838c82398db6fc9823d612a7)
This commit is contained in:
Arun Suresh 2015-10-19 20:00:38 -07:00
parent c35771c46e
commit acc0e718d6
5 changed files with 327 additions and 14 deletions

View File

@ -915,6 +915,9 @@ Release 2.8.0 - UNRELEASED
YARN-4155. TestLogAggregationService.testLogAggregationServiceWithInterval failing
(Bibin A Chundatt via stevel)
YARN-4270. Limit application resource reservation on nodes for non-node/rack
specific requests (asuresh)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -52,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -78,6 +81,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private RMContainerComparator comparator = new RMContainerComparator();
private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
// Used to record node reservation by an app.
// Key = RackName, Value = Set of Nodes reserved by app on rack
private Map<String, Set<String>> reservations = new HashMap<>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To achieve this
@ -446,22 +453,53 @@ public Container createContainer(
* in {@link FSSchedulerNode}..
*/
private void reserve(Priority priority, FSSchedulerNode node,
Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
Container container, NodeType type, boolean alreadyReserved) {
if (!alreadyReserved) {
getMetrics().reserveResource(getUser(), container.getResource());
RMContainer rmContainer =
super.reserve(node, priority, null, container);
node.reserveResource(this, priority, rmContainer);
} else {
RMContainer rmContainer = node.getReservedContainer();
super.reserve(node, priority, rmContainer, container);
node.reserveResource(this, priority, rmContainer);
if (!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(getUser(), container.getResource());
RMContainer rmContainer =
super.reserve(node, priority, null, container);
node.reserveResource(this, priority, rmContainer);
setReservation(node);
} else {
RMContainer rmContainer = node.getReservedContainer();
super.reserve(node, priority, rmContainer, container);
node.reserveResource(this, priority, rmContainer);
setReservation(node);
}
}
}
private boolean reservationExceedsThreshold(FSSchedulerNode node,
NodeType type) {
// Only if not node-local
if (type != NodeType.NODE_LOCAL) {
int existingReservations = getNumReservations(node.getRackName(),
type == NodeType.OFF_SWITCH);
int totalAvailNodes =
(type == NodeType.OFF_SWITCH) ? scheduler.getNumClusterNodes() :
scheduler.getNumNodesInRack(node.getRackName());
int numAllowedReservations =
(int)Math.ceil(
totalAvailNodes * scheduler.getReservableNodesRatio());
if (existingReservations >= numAllowedReservations) {
DecimalFormat df = new DecimalFormat();
df.setMaximumFractionDigits(2);
LOG.info("Reservation Exceeds Allowed number of nodes:" +
" app_id=" + getApplicationId() +
" existingReservations=" + existingReservations +
" totalAvailableNodes=" + totalAvailNodes +
" reservableNodesRatio=" + df.format(
scheduler.getReservableNodesRatio()) +
" numAllowedReservations=" + numAllowedReservations);
return true;
}
}
return false;
}
/**
* Remove the reservation on {@code node} at the given {@link Priority}.
* This dispatches SchedulerNode handlers as well.
@ -470,10 +508,47 @@ public void unreserve(Priority priority, FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
unreserveInternal(priority, node);
node.unreserveResource(this);
clearReservation(node);
getMetrics().unreserveResource(
getUser(), rmContainer.getContainer().getResource());
}
private synchronized void setReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations == null) {
rackReservations = new HashSet<>();
reservations.put(rackName, rackReservations);
}
rackReservations.add(node.getNodeName());
}
private synchronized void clearReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations != null) {
rackReservations.remove(node.getNodeName());
}
}
int getNumReservations(String rackName, boolean isAny) {
int counter = 0;
if (isAny) {
for (Set<String> nodes : reservations.values()) {
if (nodes != null) {
counter += nodes.size();
}
}
} else {
Set<String> nodes = reservations.get(
rackName == null ? "NULL" : rackName);
if (nodes != null) {
counter += nodes.size();
}
}
return counter;
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
@ -545,7 +620,7 @@ private Resource assignContainer(
if (isReservable(container)) {
// The desired container won't fit here, so reserve
reserve(request.getPriority(), node, container, reserved);
reserve(request.getPriority(), node, container, type, reserved);
return FairScheduler.CONTAINER_RESERVED;
} else {

View File

@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -180,7 +181,13 @@ public class FairScheduler extends
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
// Count of number of nodes per rack
private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@ -263,6 +270,14 @@ public FairSchedulerConfiguration getConf() {
return conf;
}
public int getNumNodesInRack(String rackName) {
String rName = rackName == null ? "NULL" : rackName;
if (nodesPerRack.containsKey(rName)) {
return nodesPerRack.get(rName);
}
return 0;
}
public QueueManager getQueueManager() {
return queueMgr;
}
@ -870,6 +885,12 @@ private synchronized void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode);
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
if (nodesPerRack.containsKey(rackName)) {
nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
} else {
nodesPerRack.put(rackName, 1);
}
Resources.addTo(clusterResource, node.getTotalCapability());
updateMaximumAllocation(schedulerNode, true);
@ -916,6 +937,14 @@ private synchronized void removeNode(RMNode rmNode) {
}
nodes.remove(rmNode.getNodeID());
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
if (nodesPerRack.containsKey(rackName)
&& (nodesPerRack.get(rackName) > 0)) {
nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
} else {
LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
" unknown rack [" + rackName + "] !!");
}
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
updateMaximumAllocation(node, false);
@ -1367,6 +1396,7 @@ private void initScheduler(Configuration conf) throws IOException {
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
reservableNodesRatio = this.conf.getReservableNodes();
updateInterval = this.conf.getUpdateInterval();
if (updateInterval < 0) {
@ -1747,4 +1777,8 @@ protected void decreaseContainer(
SchedulerApplicationAttempt attempt) {
// TODO Auto-generated method stub
}
public float getReservableNodesRatio() {
return reservableNodesRatio;
}
}

View File

@ -137,6 +137,11 @@ public class FairSchedulerConfiguration extends Configuration {
CONF_PREFIX + "update-interval-ms";
public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
/** Ratio of nodes available for an app to make an reservation on. */
public static final String RESERVABLE_NODES =
CONF_PREFIX + "reservable-nodes";
public static final float RESERVABLE_NODES_DEFAULT = 0.05f;
public FairSchedulerConfiguration() {
super();
}
@ -247,6 +252,10 @@ public boolean getUsePortForNodeName() {
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
}
public float getReservableNodes() {
return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT);
}
/**
* Parses a resource config value of a form like "1024", "1024 mb",
* or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.

View File

@ -760,6 +760,7 @@ public void testSimpleContainerReservation() throws Exception {
// Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
@ -789,6 +790,197 @@ public void testSimpleContainerReservation() throws Exception {
}
@Test (timeout = 5000)
public void testOffSwitchAppReservationThreshold() throws Exception {
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add three node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Ensure capacity on all nodes are allocated
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
// Verify capacity allocation
assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
// Create new app with a resource request that can be satisfied by any
// node but would be
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
assertEquals(1,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(2,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
// No new reservations should happen since it exceeds threshold
assertEquals(2,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
// Add 1 more node
RMNode node4 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.4");
NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
scheduler.handle(nodeEvent4);
// New node satisfies resource request
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
scheduler.update();
// Verify number of reservations have decremented
assertEquals(0,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
}
@Test (timeout = 5000)
public void testRackLocalAppReservationThreshold() throws Exception {
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add four node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// These 3 on different rack
RMNode node2 =
MockNodes
.newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes
.newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
RMNode node4 =
MockNodes
.newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4");
NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
scheduler.handle(nodeEvent4);
// Ensure capacity on all nodes are allocated
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
// Verify capacity allocation
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
// Create new app with a resource request that can be satisfied by any
// node but would be
ApplicationAttemptId attemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false);
scheduler.addApplicationAttempt(attemptId, false, false);
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
null, null, null);
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
assertEquals(1,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(2,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
// No new reservations should happen since it exceeds threshold
assertEquals(2,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
// Add 1 more node
RMNode node5 =
MockNodes
.newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4");
NodeAddedSchedulerEvent nodeEvent5 = new NodeAddedSchedulerEvent(node5);
scheduler.handle(nodeEvent5);
// New node satisfies resource request
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
scheduler.handle(new NodeUpdateSchedulerEvent(node3));
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
scheduler.update();
// Verify number of reservations have decremented
assertEquals(0,
scheduler.getSchedulerApp(attId).getNumReservations(null, true));
}
@Test (timeout = 500000)
public void testContainerReservationAttemptExceedingQueueMax()
throws Exception {